Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
Expand Down
148 changes: 100 additions & 48 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
use arrow_string::like::starts_with;
use async_stream::try_stream;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use futures::{try_join, TryFutureExt};
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
Expand All @@ -44,25 +43,38 @@ use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, Schema};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
}

impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub(crate) fn new(file_io: FileIO) -> Self {
let num_cpus = available_parallelism().get();

ArrowReaderBuilder {
batch_size: None,
file_io,
concurrency_limit_data_files: num_cpus,
}
}

/// Sets the max number of in flight data files that are being fetched
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
self.concurrency_limit_data_files = val;

self
}

/// Sets the desired size of batches in the response
/// to something other than the default
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
Expand All @@ -75,6 +87,7 @@ impl ArrowReaderBuilder {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
concurrency_limit_data_files: self.concurrency_limit_data_files,
}
}
}
Expand All @@ -84,73 +97,113 @@ impl ArrowReaderBuilder {
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,

/// the maximum number of data files that can be fetched at the same time
concurrency_limit_data_files: usize,
}

impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();

Ok(try_stream! {
while let Some(task_result) = tasks.next().await {
match task_result {
Ok(task) => {
// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
if let Some(predicates) = task.predicate() {
visit(&mut collector, predicates)?;
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();

spawn(async move {
let result = tasks
.map(|task| Ok((task, file_io.clone(), tx.clone())))
.try_for_each_concurrent(
concurrency_limit_data_files,
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
let file_path = task.data_file_path().to_string();

spawn(async move {
Self::process_file_scan_task(task, batch_size, file_io, tx)
.await
})
.await
.map_err(|e| e.with_context("file_path", file_path))
}
Err(err) => Err(err),
}
},
)
.await;

if let Err(error) = result {
let _ = channel_for_error.send(Err(error)).await;
}
});

return Ok(rx.boxed());
}

async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
) -> Result<()> {
// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};

let parquet_file = file_io
.new_input(task.data_file_path())?;
if let Some(predicates) = task.predicate() {
visit(&mut collector, predicates)?;
}

let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let parquet_file = file_io.new_input(task.data_file_path())?;

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let projection_mask = Self::get_arrow_projection_mask(
task.project_field_ids(),
task.schema(),
parquet_schema,
arrow_schema,
)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}
let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;

if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}
if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}

let mut batch_stream = batch_stream_builder.build()?;
if let Some(batch_size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

while let Some(batch) = batch_stream.next().await {
yield batch?;
}
}
Err(e) => {
Err(e)?
}
}
}
let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
}
.boxed())

Ok(())
}

fn get_arrow_projection_mask(
&self,
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> crate::Result<ProjectionMask> {
) -> Result<ProjectionMask> {
if field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
Expand Down Expand Up @@ -216,7 +269,6 @@ impl ArrowReader {
}

fn get_row_filter(
&self,
predicates: Option<&BoundPredicate>,
parquet_schema: &SchemaDescriptor,
collector: &CollectFieldIdVisitor,
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ pub mod transform;
mod runtime;

pub mod arrow;
mod utils;
pub mod writer;
34 changes: 24 additions & 10 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::spec::{
SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind, Result};

/// A stream of [`FileScanTask`].
Expand All @@ -55,15 +56,14 @@ pub struct TableScanBuilder<'a> {
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
concurrency_limit_manifest_files: usize,
concurrency_limit_data_files: usize,
concurrency_limit_manifest_entries: usize,
concurrency_limit_manifest_files: usize,
}

impl<'a> TableScanBuilder<'a> {
pub(crate) fn new(table: &'a Table) -> Self {
let num_cpus = std::thread::available_parallelism()
.expect("failed to get number of CPUs")
.get();
let num_cpus = available_parallelism().get();

Self {
table,
Expand All @@ -72,8 +72,9 @@ impl<'a> TableScanBuilder<'a> {
batch_size: None,
case_sensitive: true,
filter: None,
concurrency_limit_manifest_files: num_cpus,
concurrency_limit_data_files: num_cpus,
concurrency_limit_manifest_entries: num_cpus,
concurrency_limit_manifest_files: num_cpus,
}
}

Expand Down Expand Up @@ -124,12 +125,13 @@ impl<'a> TableScanBuilder<'a> {
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
self.concurrency_limit_manifest_entries = limit;
self.concurrency_limit_data_files = limit;
self
}

/// Sets the manifest file concurrency limit for this scan
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
/// Sets the data file concurrency limit for this scan
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_data_files = limit;
self
}

Expand All @@ -139,6 +141,12 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Sets the manifest file concurrency limit for this scan
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
self
}

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Expand Down Expand Up @@ -244,10 +252,11 @@ impl<'a> TableScanBuilder<'a> {
Ok(TableScan {
batch_size: self.batch_size,
column_names: self.column_names,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
file_io: self.table.file_io().clone(),
plan_context,
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
})
}
}
Expand All @@ -266,6 +275,10 @@ pub struct TableScan {
/// The maximum number of [`ManifestEntry`]s that will
/// be processed in parallel
concurrency_limit_manifest_entries: usize,

/// The maximum number of [`ManifestEntry`]s that will
/// be processed in parallel
concurrency_limit_data_files: usize,
}

/// PlanContext wraps a [`SnapshotRef`] alongside all the other
Expand Down Expand Up @@ -350,7 +363,8 @@ impl TableScan {

/// Returns an [`ArrowRecordBatchStream`].
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone());
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
.with_data_file_concurrency_limit(self.concurrency_limit_data_files);

if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
Expand Down
42 changes: 42 additions & 0 deletions crates/iceberg/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;

// Use a default value of 1 as the safest option.
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
// for more details.
const DEFAULT_PARALLELISM: usize = 1;

/// Uses [`std::thread::available_parallelism`] in order to
/// retrieve an estimate of the default amount of parallelism
/// that should be used. Note that [`std::thread::available_parallelism`]
/// returns a `Result` as it can fail, so here we use
/// a default value instead.
/// Note: we don't use a OnceCell or LazyCell here as there
/// are circumstances where the level of available
/// parallelism can change during the lifetime of an executing
/// process, but this should not be called in a hot loop.
pub(crate) fn available_parallelism() -> NonZero<usize> {
std::thread::available_parallelism().unwrap_or_else(|_err| {
// Failed to get the level of parallelism.
// TODO: log/trace when this fallback occurs.

// Using a default value.
NonZero::new(DEFAULT_PARALLELISM).unwrap()
})
}