Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 307 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ use std::str::FromStr;
use std::sync::Arc;

use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
use arrow_array::{
Array, ArrayRef, BooleanArray, Datum as ArrowDatum, Int32Array, RecordBatch, RunArray, Scalar,
StringArray,
};
use arrow_cast::cast::cast;
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{
ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
ArrowError, DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use arrow_string::like::starts_with;
use bytes::Bytes;
Expand Down Expand Up @@ -59,6 +62,12 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Reserved field ID for the file path (_file) column per Iceberg spec
pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646;

/// Column name for the file path metadata column per Iceberg spec
pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file";

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
Expand Down Expand Up @@ -219,10 +228,23 @@ impl ArrowReader {
initial_stream_builder
};

// Check if _file column is requested and filter it out for projection
let project_field_ids_without_virtual: Vec<i32> = task
.project_field_ids
.iter()
.filter_map(|&field_id| {
if field_id == RESERVED_FIELD_ID_FILE {
None
} else {
Some(field_id)
}
})
.collect();

// Fallback IDs don't match Parquet's embedded field IDs (since they don't exist),
// so we must use position-based projection instead of field-ID matching
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&project_field_ids_without_virtual,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
Expand All @@ -236,7 +258,7 @@ impl ArrowReader {
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
RecordBatchTransformer::build(task.schema_ref(), &project_field_ids_without_virtual);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -368,13 +390,36 @@ impl ArrowReader {
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
}

// Clone data_file_path for use in the closure
let data_file_path = task.data_file_path.clone();

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Ok(batch) => {
let mut processed_batch =
record_batch_transformer.process_record_batch(batch)?;

// Add the _file column at each requested position
// We insert them back at their original positions since we're reconstructing
// the original column order
for (position, field_id) in task.project_field_ids.iter().enumerate() {
if *field_id == RESERVED_FIELD_ID_FILE {
processed_batch = Self::add_file_path_column_ree_at_position(
processed_batch,
&data_file_path,
RESERVED_COL_NAME_FILE,
RESERVED_FIELD_ID_FILE,
position,
)?;
}
}

Ok(processed_batch)
}
Err(err) => Err(err.into()),
});

Expand Down Expand Up @@ -523,6 +568,93 @@ impl ArrowReader {
Ok(results.into())
}

/// Helper function to add a `_file` column to a RecordBatch at a specific position.
/// Takes the array, field to add, and position where to insert.
fn create_file_field_at_position(
batch: RecordBatch,
file_array: ArrayRef,
file_field: Field,
field_id: i32,
position: usize,
) -> Result<RecordBatch> {
let file_field_with_metadata = Arc::new(file_field.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
field_id.to_string(),
)])));

// Build columns vector in a single pass without insert
let original_columns = batch.columns();
let mut columns = Vec::with_capacity(original_columns.len() + 1);
columns.extend_from_slice(&original_columns[..position]);
columns.push(file_array);
columns.extend_from_slice(&original_columns[position..]);

// Build fields vector in a single pass without insert
let schema = batch.schema();
let original_fields = schema.fields();
let mut fields = Vec::with_capacity(original_fields.len() + 1);
fields.extend(original_fields[..position].iter().cloned());
fields.push(file_field_with_metadata);
fields.extend(original_fields[position..].iter().cloned());

let schema = Arc::new(ArrowSchema::new(fields));
RecordBatch::try_new(schema, columns).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to add _file column to RecordBatch",
)
.with_source(e)
})
}

/// Adds a `_file` column to the RecordBatch at a specific position.
/// Uses Run-End Encoding (REE) for maximum memory efficiency.
pub(crate) fn add_file_path_column_ree_at_position(
batch: RecordBatch,
file_path: &str,
field_name: &str,
field_id: i32,
position: usize,
) -> Result<RecordBatch> {
let num_rows = batch.num_rows();

// Use Run-End Encoded array for optimal memory efficiency
let run_ends = if num_rows == 0 {
Int32Array::from(Vec::<i32>::new())
} else {
Int32Array::from(vec![num_rows as i32])
};
let values = if num_rows == 0 {
StringArray::from(Vec::<&str>::new())
} else {
StringArray::from(vec![file_path])
};

let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create RunArray for _file column",
)
.with_source(e)
})?;

let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
let values_field = Arc::new(Field::new("values", DataType::Utf8, true));
let file_field = Field::new(
field_name,
DataType::RunEndEncoded(run_ends_field, values_field),
false,
);

Self::create_file_field_at_position(
batch,
Arc::new(file_array),
file_field,
field_id,
position,
)
}

fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
predicate: &BoundPredicate,
Expand Down Expand Up @@ -1626,6 +1758,7 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use as_any::Downcast;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::{ArrowWriter, ProjectionMask};
Expand All @@ -1639,7 +1772,9 @@ mod tests {

use crate::ErrorKind;
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
use crate::arrow::{
ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE,
};
use crate::delete_vector::DeleteVector;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
Expand Down Expand Up @@ -2438,6 +2573,172 @@ message schema {
assert!(col_b.is_null(2));
}

#[test]
fn test_add_file_path_column_ree() {
use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};

// Create a simple test batch with 2 columns and 3 rows
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);

let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(id_array),
Arc::new(name_array),
])
.unwrap();

assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);

// Add file path column with REE at the end (position 2)
let file_path = "/path/to/data/file.parquet";
let result = ArrowReader::add_file_path_column_ree_at_position(
batch,
file_path,
RESERVED_COL_NAME_FILE,
RESERVED_FIELD_ID_FILE,
2, // Position at the end after id and name columns
);
assert!(result.is_ok(), "Should successfully add file path column");

let new_batch = result.unwrap();

// Verify the new batch has 3 columns
assert_eq!(new_batch.num_columns(), 3);
assert_eq!(new_batch.num_rows(), 3);

// Verify schema has the _file column
let schema = new_batch.schema();
assert_eq!(schema.fields().len(), 3);

let file_field = schema.field(2);
assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE);
assert!(!file_field.is_nullable());

// Verify the field has the correct metadata
let metadata = file_field.metadata();
assert_eq!(
metadata.get(PARQUET_FIELD_ID_META_KEY),
Some(&RESERVED_FIELD_ID_FILE.to_string())
);

// Verify the data type is RunEndEncoded
match file_field.data_type() {
DataType::RunEndEncoded(run_ends_field, values_field) => {
assert_eq!(run_ends_field.name(), "run_ends");
assert_eq!(run_ends_field.data_type(), &DataType::Int32);
assert!(!run_ends_field.is_nullable());

assert_eq!(values_field.name(), "values");
assert_eq!(values_field.data_type(), &DataType::Utf8);
}
_ => panic!("Expected RunEndEncoded data type for _file column"),
}

// Verify the original columns are intact
let id_col = new_batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.values(), &[1, 2, 3]);

let name_col = new_batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");

// Verify the file path column contains the correct value
// The _file column is a RunArray, so we need to decode it
let file_col = new_batch.column(2);
let run_array = file_col
.as_any()
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
.expect("Expected RunArray for _file column");

// Verify the run array structure (should be optimally encoded)
let run_ends = run_array.run_ends();
assert_eq!(run_ends.values().len(), 1, "Should have only 1 run end");
assert_eq!(
run_ends.values()[0],
new_batch.num_rows() as i32,
"Run end should equal number of rows"
);

// Check that the single value in the RunArray is the expected file path
let values = run_array.values();
let string_values = values.as_string::<i32>();
assert_eq!(string_values.len(), 1, "Should have only 1 value");
assert_eq!(string_values.value(0), file_path);

assert!(
string_values
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.all(|v| v == Some(file_path))
)
}

#[test]
fn test_add_file_path_column_ree_empty_batch() {
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

// Create an empty batch
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

let id_array = arrow_array::Int32Array::from(Vec::<i32>::new());
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();

assert_eq!(batch.num_rows(), 0);

// Add file path column to empty batch with REE at position 1 (after id column)
let file_path = "/empty/file.parquet";
let result = ArrowReader::add_file_path_column_ree_at_position(
batch,
file_path,
RESERVED_COL_NAME_FILE,
RESERVED_FIELD_ID_FILE,
1, // Position 1, after the id column
);

// Should succeed with empty RunArray for empty batches
assert!(result.is_ok());
let new_batch = result.unwrap();
assert_eq!(new_batch.num_rows(), 0);
assert_eq!(new_batch.num_columns(), 2);

// Verify the _file column exists with correct schema
let schema = new_batch.schema();
let file_field = schema.field(1);
assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE);

// Should use RunEndEncoded even for empty batches
match file_field.data_type() {
DataType::RunEndEncoded(run_ends_field, values_field) => {
assert_eq!(run_ends_field.data_type(), &DataType::Int32);
assert_eq!(values_field.data_type(), &DataType::Utf8);
}
_ => panic!("Expected RunEndEncoded data type for _file column"),
}

// Verify metadata with reserved field ID
assert_eq!(
file_field.metadata().get(PARQUET_FIELD_ID_META_KEY),
Some(&RESERVED_FIELD_ID_FILE.to_string())
);

// Verify the file path column is empty but properly structured
let file_path_column = new_batch.column(1);
assert_eq!(file_path_column.len(), 0);
}

/// Test for bug where position deletes in later row groups are not applied correctly.
///
/// When a file has multiple row groups and a position delete targets a row in a later
Expand Down
Loading
Loading