From aab78d6c897214033c3cc951d780576304910291 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 08:38:39 +0100 Subject: [PATCH 01/14] Add REE file column helpers --- crates/iceberg/src/arrow/reader.rs | 78 ++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index e0894ad6b..905d6a3da 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -492,6 +492,84 @@ impl ArrowReader { Ok(results.into()) } + /// Helper function to add a `_file` column to a RecordBatch. + /// Takes the array and field to add, reducing code duplication. + fn create_file_field( + batch: RecordBatch, + file_array: ArrayRef, + file_field: Field, + field_id: i32, + ) -> Result { + let mut columns = batch.columns().to_vec(); + columns.push(file_array); + + let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); + fields.push(Arc::new(file_field.with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + )])))); + + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to add _file column to RecordBatch", + ) + .with_source(e) + }) + } + + /// Adds a `_file` column to the RecordBatch containing the file path. + /// Uses Run-End Encoding (REE) for maximum memory efficiency when the same + /// file path is repeated across all rows. + /// Note: This is only used in tests for now, for production usage we use the + /// non-REE version as it is Julia-compatible. + #[allow(dead_code)] + pub(crate) fn add_file_path_column_ree( + batch: RecordBatch, + file_path: &str, + field_name: &str, + field_id: i32, + ) -> Result { + let num_rows = batch.num_rows(); + + // Use Run-End Encoded array for optimal memory efficiency + // For a constant value repeated num_rows times, this stores: + // - run_ends: [num_rows] (one i32) for non-empty batches, or [] for empty batches + // - values: [file_path] (one string) for non-empty batches, or [] for empty batches + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + let values = if num_rows == 0 { + StringArray::from(Vec::<&str>::new()) + } else { + StringArray::from(vec![file_path]) + }; + + let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for _file column", + ) + .with_source(e) + })?; + + // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE + // DataType is RunEndEncoded with Int32 run ends and Utf8 values + // Note: values field is nullable to match what RunArray::try_new(..) expects. + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + let file_field = Field::new( + field_name, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ); + + Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) + } + fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, predicate: &BoundPredicate, From ee21cab449becfdd0f32d1ef1ae23318b6f13e6a Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 08:40:04 +0100 Subject: [PATCH 02/14] Add helper tests --- crates/iceberg/src/arrow/reader.rs | 164 +++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 905d6a3da..c5cd28f2f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2370,6 +2370,170 @@ message schema { assert!(col_b.is_null(2)); } + #[test] + fn test_add_file_path_column_ree() { + use arrow_array::{Array, Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + // Create a simple test batch with 2 columns and 3 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let id_array = Int32Array::from(vec![1, 2, 3]); + let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(name_array), + ]) + .unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Add file path column with REE + let file_path = "/path/to/data/file.parquet"; + let result = ArrowReader::add_file_path_column_ree( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok(), "Should successfully add file path column"); + + let new_batch = result.unwrap(); + + // Verify the new batch has 3 columns + assert_eq!(new_batch.num_columns(), 3); + assert_eq!(new_batch.num_rows(), 3); + + // Verify schema has the _file column + let schema = new_batch.schema(); + assert_eq!(schema.fields().len(), 3); + + let file_field = schema.field(2); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert!(!file_field.is_nullable()); + + // Verify the field has the correct metadata + let metadata = file_field.metadata(); + assert_eq!( + metadata.get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the data type is RunEndEncoded + match file_field.data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.name(), "run_ends"); + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert!(!run_ends_field.is_nullable()); + + assert_eq!(values_field.name(), "values"); + assert_eq!(values_field.data_type(), &DataType::Utf8); + } + _ => panic!("Expected RunEndEncoded data type for _file column"), + } + + // Verify the original columns are intact + let id_col = new_batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.values(), &[1, 2, 3]); + + let name_col = new_batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + + // Verify the file path column contains the correct value + // The _file column is a RunArray, so we need to decode it + let file_col = new_batch.column(2); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("Expected RunArray for _file column"); + + // Verify the run array structure (should be optimally encoded) + let run_ends = run_array.run_ends(); + assert_eq!(run_ends.values().len(), 1, "Should have only 1 run end"); + assert_eq!( + run_ends.values()[0], + new_batch.num_rows() as i32, + "Run end should equal number of rows" + ); + + // Check that the single value in the RunArray is the expected file path + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have only 1 value"); + assert_eq!(string_values.value(0), file_path); + + assert!( + string_values + .downcast_ref::() + .unwrap() + .iter() + .all(|v| v == Some(file_path)) + ) + } + + #[test] + fn test_add_file_path_column_ree_empty_batch() { + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + // Create an empty batch + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = arrow_array::Int32Array::from(Vec::::new()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + assert_eq!(batch.num_rows(), 0); + + // Add file path column to empty batch with REE + let file_path = "/empty/file.parquet"; + let result = ArrowReader::add_file_path_column_ree( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + + // Should succeed with empty RunArray for empty batches + assert!(result.is_ok()); + let new_batch = result.unwrap(); + assert_eq!(new_batch.num_rows(), 0); + assert_eq!(new_batch.num_columns(), 2); + + // Verify the _file column exists with correct schema + let schema = new_batch.schema(); + let file_field = schema.field(1); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + + // Should use RunEndEncoded even for empty batches + match file_field.data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert_eq!(values_field.data_type(), &DataType::Utf8); + } + _ => panic!("Expected RunEndEncoded data type for _file column"), + } + + // Verify metadata with reserved field ID + assert_eq!( + file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the file path column is empty but properly structured + let file_path_column = new_batch.column(1); + assert_eq!(file_path_column.len(), 0); + } + /// Test for bug where position deletes in later row groups are not applied correctly. /// /// When a file has multiple row groups and a position delete targets a row in a later From 37b52e2707a0e4717d9143ce323f66efde7cda91 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 08:41:20 +0100 Subject: [PATCH 03/14] Add constants --- crates/iceberg/src/arrow/reader.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c5cd28f2f..4c0179b37 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -59,6 +59,14 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Column name for the file path metadata column per Iceberg spec +/// This is dead code for now but will be used when we add the _file column support. +#[allow(dead_code)] +pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Reserved field ID for the file path column used in delete file reading. +pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, From 44463a0ce1de8809cbb94a06f3707650d4d3e1ee Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 10:05:55 +0100 Subject: [PATCH 04/14] Add support for _file constant --- crates/iceberg/src/arrow/delete_filter.rs | 2 + crates/iceberg/src/arrow/reader.rs | 114 ++++++--- crates/iceberg/src/scan/context.rs | 9 + crates/iceberg/src/scan/mod.rs | 281 +++++++++++++++++++++- crates/iceberg/src/scan/task.rs | 5 + 5 files changed, 366 insertions(+), 45 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index b853baa99..01b71cd4c 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -338,6 +338,7 @@ pub(crate) mod tests { schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, + file_column_position: None, deletes: vec![pos_del_1, pos_del_2.clone()], }, FileScanTask { @@ -349,6 +350,7 @@ pub(crate) mod tests { schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, + file_column_position: None, deletes: vec![pos_del_3], }, ]; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4c0179b37..6eedb3796 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -23,11 +23,14 @@ use std::str::FromStr; use std::sync::Arc; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; -use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar}; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Datum as ArrowDatum, Int32Array, RecordBatch, RunArray, Scalar, + StringArray, +}; use arrow_cast::cast::cast; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ - ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + ArrowError, DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use arrow_string::like::starts_with; use bytes::Bytes; @@ -59,14 +62,12 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; + /// Column name for the file path metadata column per Iceberg spec -/// This is dead code for now but will be used when we add the _file column support. -#[allow(dead_code)] pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; -/// Reserved field ID for the file path column used in delete file reading. -pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546; - /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -344,13 +345,33 @@ impl ArrowReader { record_batch_stream_builder.with_row_groups(selected_row_group_indices); } + // Get the _file column position from the task (if requested) + let file_column_position = task.file_column_position; + let data_file_path = task.data_file_path.clone(); + // Build the batch stream and send all the RecordBatches that it generates // to the requester. let record_batch_stream = record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), + Ok(batch) => { + let processed_batch = + record_batch_transformer.process_record_batch(batch)?; + + // Add the _file column if requested at the correct position + if let Some(position) = file_column_position { + Self::add_file_path_column_ree_at_position( + processed_batch, + &data_file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + position, + ) + } else { + Ok(processed_batch) + } + } Err(err) => Err(err.into()), }); @@ -500,22 +521,34 @@ impl ArrowReader { Ok(results.into()) } - /// Helper function to add a `_file` column to a RecordBatch. - /// Takes the array and field to add, reducing code duplication. - fn create_file_field( + /// Helper function to add a `_file` column to a RecordBatch at a specific position. + /// Takes the array, field to add, and position where to insert. + fn create_file_field_at_position( batch: RecordBatch, file_array: ArrayRef, file_field: Field, field_id: i32, + position: usize, ) -> Result { - let mut columns = batch.columns().to_vec(); - columns.push(file_array); - - let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); - fields.push(Arc::new(file_field.with_metadata(HashMap::from([( + let file_field_with_metadata = Arc::new(file_field.with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string(), - )])))); + )]))); + + // Build columns vector in a single pass without insert + let original_columns = batch.columns(); + let mut columns = Vec::with_capacity(original_columns.len() + 1); + columns.extend_from_slice(&original_columns[..position]); + columns.push(file_array); + columns.extend_from_slice(&original_columns[position..]); + + // Build fields vector in a single pass without insert + let schema = batch.schema(); + let original_fields = schema.fields(); + let mut fields = Vec::with_capacity(original_fields.len() + 1); + fields.extend(original_fields[..position].iter().cloned()); + fields.push(file_field_with_metadata); + fields.extend(original_fields[position..].iter().cloned()); let schema = Arc::new(ArrowSchema::new(fields)); RecordBatch::try_new(schema, columns).map_err(|e| { @@ -527,24 +560,18 @@ impl ArrowReader { }) } - /// Adds a `_file` column to the RecordBatch containing the file path. - /// Uses Run-End Encoding (REE) for maximum memory efficiency when the same - /// file path is repeated across all rows. - /// Note: This is only used in tests for now, for production usage we use the - /// non-REE version as it is Julia-compatible. - #[allow(dead_code)] - pub(crate) fn add_file_path_column_ree( + /// Adds a `_file` column to the RecordBatch at a specific position. + /// Uses Run-End Encoding (REE) for maximum memory efficiency. + pub(crate) fn add_file_path_column_ree_at_position( batch: RecordBatch, file_path: &str, field_name: &str, field_id: i32, + position: usize, ) -> Result { let num_rows = batch.num_rows(); // Use Run-End Encoded array for optimal memory efficiency - // For a constant value repeated num_rows times, this stores: - // - run_ends: [num_rows] (one i32) for non-empty batches, or [] for empty batches - // - values: [file_path] (one string) for non-empty batches, or [] for empty batches let run_ends = if num_rows == 0 { Int32Array::from(Vec::::new()) } else { @@ -564,9 +591,6 @@ impl ArrowReader { .with_source(e) })?; - // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE - // DataType is RunEndEncoded with Int32 run ends and Utf8 values - // Note: values field is nullable to match what RunArray::try_new(..) expects. let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); let file_field = Field::new( @@ -575,7 +599,13 @@ impl ArrowReader { false, ); - Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) + Self::create_file_field_at_position( + batch, + Arc::new(file_array), + file_field, + field_id, + position, + ) } fn build_field_id_set_and_map( @@ -1573,6 +1603,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use as_any::Downcast; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::{ArrowWriter, ProjectionMask}; @@ -1586,7 +1617,9 @@ mod tests { use crate::ErrorKind; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; - use crate::arrow::{ArrowReader, ArrowReaderBuilder}; + use crate::arrow::{ + ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, + }; use crate::delete_vector::DeleteVector; use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; @@ -1887,6 +1920,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -2205,6 +2239,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1], predicate: None, + file_column_position: None, deletes: vec![], }; @@ -2218,6 +2253,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1], predicate: None, + file_column_position: None, deletes: vec![], }; @@ -2342,6 +2378,7 @@ message schema { schema: new_schema.clone(), project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -2401,13 +2438,14 @@ message schema { assert_eq!(batch.num_columns(), 2); assert_eq!(batch.num_rows(), 3); - // Add file path column with REE + // Add file path column with REE at the end (position 2) let file_path = "/path/to/data/file.parquet"; - let result = ArrowReader::add_file_path_column_ree( + let result = ArrowReader::add_file_path_column_ree_at_position( batch, file_path, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, + 2, // Position at the end after id and name columns ); assert!(result.is_ok(), "Should successfully add file path column"); @@ -2502,13 +2540,14 @@ message schema { assert_eq!(batch.num_rows(), 0); - // Add file path column to empty batch with REE + // Add file path column to empty batch with REE at position 1 (after id column) let file_path = "/empty/file.parquet"; - let result = ArrowReader::add_file_path_column_ree( + let result = ArrowReader::add_file_path_column_ree_at_position( batch, file_path, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, + 1, // Position 1, after the id column ); // Should succeed with empty RunArray for empty batches @@ -2669,6 +2708,7 @@ message schema { schema: table_schema.clone(), project_field_ids: vec![1], predicate: None, + file_column_position: None, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, @@ -2884,6 +2924,7 @@ message schema { schema: table_schema.clone(), project_field_ids: vec![1], predicate: None, + file_column_position: None, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, @@ -3092,6 +3133,7 @@ message schema { schema: table_schema.clone(), project_field_ids: vec![1], predicate: None, + file_column_position: None, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 3f7c29dbf..48652890d 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext { snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, + file_column_position: Option, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext { pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, + pub file_column_position: Option, } impl ManifestFileContext { @@ -74,6 +76,7 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, + file_column_position, .. } = self; @@ -89,6 +92,7 @@ impl ManifestFileContext { bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), delete_file_index: delete_file_index.clone(), + file_column_position, }; sender @@ -127,6 +131,8 @@ impl ManifestEntryContext { .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), + file_column_position: self.file_column_position, + deletes, }) } @@ -149,6 +155,8 @@ pub(crate) struct PlanContext { pub partition_filter_cache: Arc, pub manifest_evaluator_cache: Arc, pub expression_evaluator_cache: Arc, + + pub file_column_position: Option, } impl PlanContext { @@ -260,6 +268,7 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, + file_column_position: self.file_column_position, } } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 6884e00b9..05f1c81e9 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -31,7 +31,7 @@ use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; -use crate::arrow::ArrowReaderBuilder; +use crate::arrow::{ArrowReaderBuilder, RESERVED_COL_NAME_FILE as RESERVED_COL_NAME_FILE_INTERNAL}; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -42,6 +42,27 @@ use crate::table::Table; use crate::utils::available_parallelism; use crate::{Error, ErrorKind, Result}; +/// Reserved column name for the file path metadata column. +/// +/// When this column is selected in a table scan (e.g., `.select(["col1", RESERVED_COL_NAME_FILE])`), +/// each row will include the path of the file from which that row was read. +/// This is useful for debugging, auditing, or tracking data lineage. +/// +/// # Example +/// ```no_run +/// # use iceberg::scan::RESERVED_COL_NAME_FILE; +/// # async fn example() -> iceberg::Result<()> { +/// # let table = todo!(); +/// // Select regular columns along with the file path +/// let scan = table +/// .scan() +/// .select(["id", "name", RESERVED_COL_NAME_FILE]) +/// .build()?; +/// # Ok(()) +/// # } +/// ``` +pub const RESERVED_COL_NAME_FILE: &str = RESERVED_COL_NAME_FILE_INTERNAL; + /// A stream of arrow [`RecordBatch`]es. pub type ArrowRecordBatchStream = BoxStream<'static, Result>; @@ -217,9 +238,13 @@ impl<'a> TableScanBuilder<'a> { let schema = snapshot.schema(self.table.metadata())?; - // Check that all column names exist in the schema. + // Check that all column names exist in the schema (skip reserved columns). if let Some(column_names) = self.column_names.as_ref() { for column_name in column_names { + // Skip reserved columns that don't exist in the schema + if column_name == RESERVED_COL_NAME_FILE_INTERNAL { + continue; + } if schema.field_by_name(column_name).is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -239,7 +264,16 @@ impl<'a> TableScanBuilder<'a> { .collect() }); - for column_name in column_names.iter() { + // Track the position of the _file column if requested + let mut file_column_position = None; + + for (index, column_name) in column_names.iter().enumerate() { + // Handle special reserved column "_file" + if column_name == RESERVED_COL_NAME_FILE_INTERNAL { + file_column_position = Some(index); + continue; // Don't add to field_ids - it's a virtual column + } + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -254,10 +288,10 @@ impl<'a> TableScanBuilder<'a> { Error::new( ErrorKind::FeatureUnsupported, format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; + "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" + ), + ) + })?; field_ids.push(field_id); } @@ -280,6 +314,7 @@ impl<'a> TableScanBuilder<'a> { partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + file_column_position, }; Ok(TableScan { @@ -559,8 +594,10 @@ pub mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use futures::{TryStreamExt, stream}; use minijinja::value::Value; @@ -575,7 +612,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; - use crate::scan::FileScanTask; + use crate::scan::{FileScanTask, RESERVED_COL_NAME_FILE}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec, @@ -1776,6 +1813,7 @@ pub mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, + file_column_position: None, deletes: vec![], }; test_fn(task); @@ -1790,8 +1828,233 @@ pub mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, + file_column_position: None, deletes: vec![], }; test_fn(task); } + + #[tokio::test] + async fn test_select_with_file_column() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select regular columns plus the _file column + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _file + assert_eq!(batches[0].num_columns(), 2); + + // Verify the x column exists and has correct data + let x_col = batches[0].column_by_name("x").unwrap(); + let x_arr = x_col.as_primitive::(); + assert_eq!(x_arr.value(0), 1); + + // Verify the _file column exists + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE); + assert!( + file_col.is_some(), + "_file column should be present in the batch" + ); + + // Verify the _file column contains a file path + let file_col = file_col.unwrap(); + assert!( + matches!( + file_col.data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + + // Decode the RunArray to verify it contains the file path + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have a single file path"); + + let file_path = string_values.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet, got: {}", + file_path + ); + } + + #[tokio::test] + async fn test_select_file_column_position() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select columns in specific order: x, _file, z + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE, "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify column order: x at position 0, _file at position 1, z at position 2 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(2).name(), "z"); + + // Verify columns by name also works + assert!(batches[0].column_by_name("x").is_some()); + assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some()); + assert!(batches[0].column_by_name("z").is_some()); + } + + #[tokio::test] + async fn test_select_file_column_only() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select only the _file column + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Should have exactly 1 column + assert_eq!(batches[0].num_columns(), 1); + + // Verify it's the _file column + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + + // Verify the batch has the correct number of rows + // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted) + // Each file has 1024 rows, so total is 2048 rows + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2048); + } + + #[tokio::test] + async fn test_file_column_with_multiple_files() { + use std::collections::HashSet; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select x and _file columns + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Collect all unique file paths from the batches + let mut file_paths = HashSet::new(); + for batch in &batches { + let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + for i in 0..string_values.len() { + file_paths.insert(string_values.value(i).to_string()); + } + } + + // We should have multiple files (the test creates 1.parquet and 3.parquet) + assert!(file_paths.len() >= 1, "Should have at least one file path"); + + // All paths should end with .parquet + for path in &file_paths { + assert!( + path.ends_with(".parquet"), + "All file paths should end with .parquet, got: {}", + path + ); + } + } + + #[tokio::test] + async fn test_file_column_at_start() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the start + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE, "x", "y"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 0 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(1).name(), "x"); + assert_eq!(schema.field(2).name(), "y"); + } + + #[tokio::test] + async fn test_file_column_at_end() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the end + let table_scan = fixture + .table + .scan() + .select(["x", "y", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 2 (the end) + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), "y"); + assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); + } } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 32fe3ae30..2ead68ff3 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -52,6 +52,11 @@ pub struct FileScanTask { #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, + /// The position of the _file column in the output, if requested. + /// None if the _file column was not requested. + #[serde(skip_serializing_if = "Option::is_none")] + pub file_column_position: Option, + /// The list of delete files that may need to be applied to this data file pub deletes: Vec, } From e034009b0e65978e08c02c7e0599da1e826ca1bd Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 10:29:59 +0100 Subject: [PATCH 05/14] Update tests --- crates/iceberg/src/arrow/reader.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f93e368e9..d1204d7a9 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -3385,6 +3385,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2], predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3479,6 +3480,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 3], predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3562,6 +3564,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2, 3], predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3659,6 +3662,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2], predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3785,6 +3789,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2], predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3878,6 +3883,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 5, 2], predicate: None, + file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3984,6 +3990,7 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2, 3], predicate: Some(predicate.bind(schema, true).unwrap()), + file_column_position: None, deletes: vec![], })] .into_iter(), From 4f0a4f19ccc5830d0fc770b62b7cf3e40467fffe Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 10:41:35 +0100 Subject: [PATCH 06/14] Fix clippy warning --- crates/iceberg/src/scan/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 05f1c81e9..f41616cde 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1994,7 +1994,7 @@ pub mod tests { } // We should have multiple files (the test creates 1.parquet and 3.parquet) - assert!(file_paths.len() >= 1, "Should have at least one file path"); + assert!(!file_paths.is_empty(), "Should have at least one file path"); // All paths should end with .parquet for path in &file_paths { From 51f76d38e7b4c488f34e2254971708e0d8577bc5 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 11:06:53 +0100 Subject: [PATCH 07/14] Fix doc test --- crates/iceberg/src/scan/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index f41616cde..eae5b05d3 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -51,8 +51,9 @@ use crate::{Error, ErrorKind, Result}; /// # Example /// ```no_run /// # use iceberg::scan::RESERVED_COL_NAME_FILE; +/// # use iceberg::table::Table; /// # async fn example() -> iceberg::Result<()> { -/// # let table = todo!(); +/// # let table: Table = todo!(); /// // Select regular columns along with the file path /// let scan = table /// .scan() From d84e16b8d123963c8b274308002b4f9b0cc63415 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 12:52:44 +0100 Subject: [PATCH 08/14] Track in field ids --- crates/iceberg/src/arrow/delete_filter.rs | 2 -- crates/iceberg/src/arrow/reader.rs | 37 ++++++++++++----------- crates/iceberg/src/scan/context.rs | 9 ------ crates/iceberg/src/scan/mod.rs | 17 +++++------ crates/iceberg/src/scan/task.rs | 5 --- 5 files changed, 26 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 01b71cd4c..b853baa99 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -338,7 +338,6 @@ pub(crate) mod tests { schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, - file_column_position: None, deletes: vec![pos_del_1, pos_del_2.clone()], }, FileScanTask { @@ -350,7 +349,6 @@ pub(crate) mod tests { schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, - file_column_position: None, deletes: vec![pos_del_3], }, ]; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index d1204d7a9..ef7891066 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -228,10 +228,26 @@ impl ArrowReader { initial_stream_builder }; + // Check if _file column is requested and filter it out for projection + let mut file_column_position = None; + let project_field_ids_without_virtual: Vec = task + .project_field_ids + .iter() + .enumerate() + .filter_map(|(idx, &field_id)| { + if field_id == RESERVED_FIELD_ID_FILE { + file_column_position = Some(idx); + None + } else { + Some(field_id) + } + }) + .collect(); + // Fallback IDs don't match Parquet's embedded field IDs (since they don't exist), // so we must use position-based projection instead of field-ID matching let projection_mask = Self::get_arrow_projection_mask( - &task.project_field_ids, + &project_field_ids_without_virtual, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -245,7 +261,7 @@ impl ArrowReader { // that come back from the file, such as type promotion, default column insertion // and column re-ordering let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformer::build(task.schema_ref(), &project_field_ids_without_virtual); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -377,8 +393,7 @@ impl ArrowReader { record_batch_stream_builder.with_row_groups(selected_row_group_indices); } - // Get the _file column position from the task (if requested) - let file_column_position = task.file_column_position; + // Clone data_file_path for use in the closure let data_file_path = task.data_file_path.clone(); // Build the batch stream and send all the RecordBatches that it generates @@ -2066,7 +2081,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -2385,7 +2399,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1], predicate: None, - file_column_position: None, deletes: vec![], }; @@ -2399,7 +2412,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1], predicate: None, - file_column_position: None, deletes: vec![], }; @@ -2524,7 +2536,6 @@ message schema { schema: new_schema.clone(), project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -2854,7 +2865,6 @@ message schema { schema: table_schema.clone(), project_field_ids: vec![1], predicate: None, - file_column_position: None, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, @@ -3070,7 +3080,6 @@ message schema { schema: table_schema.clone(), project_field_ids: vec![1], predicate: None, - file_column_position: None, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, @@ -3279,7 +3288,6 @@ message schema { schema: table_schema.clone(), project_field_ids: vec![1], predicate: None, - file_column_position: None, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, @@ -3385,7 +3393,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2], predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3480,7 +3487,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 3], predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3564,7 +3570,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2, 3], predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3662,7 +3667,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2], predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3789,7 +3793,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2], predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3883,7 +3886,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 5, 2], predicate: None, - file_column_position: None, deletes: vec![], })] .into_iter(), @@ -3990,7 +3992,6 @@ message schema { schema: schema.clone(), project_field_ids: vec![1, 2, 3], predicate: Some(predicate.bind(schema, true).unwrap()), - file_column_position: None, deletes: vec![], })] .into_iter(), diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 48652890d..3f7c29dbf 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -46,7 +46,6 @@ pub(crate) struct ManifestFileContext { snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, - file_column_position: Option, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -60,7 +59,6 @@ pub(crate) struct ManifestEntryContext { pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, - pub file_column_position: Option, } impl ManifestFileContext { @@ -76,7 +74,6 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, - file_column_position, .. } = self; @@ -92,7 +89,6 @@ impl ManifestFileContext { bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), delete_file_index: delete_file_index.clone(), - file_column_position, }; sender @@ -131,8 +127,6 @@ impl ManifestEntryContext { .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - file_column_position: self.file_column_position, - deletes, }) } @@ -155,8 +149,6 @@ pub(crate) struct PlanContext { pub partition_filter_cache: Arc, pub manifest_evaluator_cache: Arc, pub expression_evaluator_cache: Arc, - - pub file_column_position: Option, } impl PlanContext { @@ -268,7 +260,6 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, - file_column_position: self.file_column_position, } } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index eae5b05d3..94e8c2514 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -31,7 +31,10 @@ use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; -use crate::arrow::{ArrowReaderBuilder, RESERVED_COL_NAME_FILE as RESERVED_COL_NAME_FILE_INTERNAL}; +use crate::arrow::{ + ArrowReaderBuilder, RESERVED_COL_NAME_FILE as RESERVED_COL_NAME_FILE_INTERNAL, + RESERVED_FIELD_ID_FILE, +}; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -265,14 +268,11 @@ impl<'a> TableScanBuilder<'a> { .collect() }); - // Track the position of the _file column if requested - let mut file_column_position = None; - - for (index, column_name) in column_names.iter().enumerate() { + for column_name in column_names.iter() { // Handle special reserved column "_file" if column_name == RESERVED_COL_NAME_FILE_INTERNAL { - file_column_position = Some(index); - continue; // Don't add to field_ids - it's a virtual column + field_ids.push(RESERVED_FIELD_ID_FILE); + continue; } let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { @@ -315,7 +315,6 @@ impl<'a> TableScanBuilder<'a> { partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), - file_column_position, }; Ok(TableScan { @@ -1814,7 +1813,6 @@ pub mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, - file_column_position: None, deletes: vec![], }; test_fn(task); @@ -1829,7 +1827,6 @@ pub mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, - file_column_position: None, deletes: vec![], }; test_fn(task); diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 2ead68ff3..32fe3ae30 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -52,11 +52,6 @@ pub struct FileScanTask { #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, - /// The position of the _file column in the output, if requested. - /// None if the _file column was not requested. - #[serde(skip_serializing_if = "Option::is_none")] - pub file_column_position: Option, - /// The list of delete files that may need to be applied to this data file pub deletes: Vec, } From bd478cb41e82a925d17f56cc3816108323a0ecd7 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 15:32:17 +0100 Subject: [PATCH 09/14] Add test --- crates/iceberg/src/scan/mod.rs | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 94e8c2514..21fa1310a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -2055,4 +2055,65 @@ pub mod tests { assert_eq!(schema.field(1).name(), "y"); assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); } + + #[tokio::test] + async fn test_select_called_twice_uses_last_selection() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Call select twice - first with x and y, then with only z and _file + // The second call should override the first + let table_scan = fixture + .table + .scan() + .select(["x", "y", RESERVED_COL_NAME_FILE]) // This should be ignored + .select(["z", RESERVED_COL_NAME_FILE]) // This should be used + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have exactly 2 columns: z and _file (NOT x or y) + assert_eq!( + batches[0].num_columns(), + 2, + "Should have exactly 2 columns from the last select" + ); + + let schema = batches[0].schema(); + + // Verify the columns are z and _file in that order + assert_eq!(schema.field(0).name(), "z", "First column should be z"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_FILE, + "Second column should be _file" + ); + + // Verify x and y are NOT present + assert!( + batches[0].column_by_name("x").is_none(), + "Column x should not be present (it was only in the first select)" + ); + assert!( + batches[0].column_by_name("y").is_none(), + "Column y should not be present (it was only in the first select)" + ); + + // Verify z column has data + let z_col = batches[0].column_by_name("z").unwrap(); + let z_arr = z_col.as_primitive::(); + assert!(z_arr.len() > 0, "Column z should have data"); + + // Verify _file column exists and is properly formatted + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + assert!( + matches!( + file_col.data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + } } From 9b186c79e588adfd40c2e7f160bb1c2a06c0a7d9 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 16:04:24 +0100 Subject: [PATCH 10/14] Allow repeated virtual file column selection --- crates/iceberg/src/arrow/reader.rs | 20 ++++---- crates/iceberg/src/scan/mod.rs | 80 ++++++++++++++++++++---------- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ef7891066..8b6524971 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -229,14 +229,14 @@ impl ArrowReader { }; // Check if _file column is requested and filter it out for projection - let mut file_column_position = None; + let mut file_column_positions = Vec::new(); let project_field_ids_without_virtual: Vec = task .project_field_ids .iter() .enumerate() .filter_map(|(idx, &field_id)| { if field_id == RESERVED_FIELD_ID_FILE { - file_column_position = Some(idx); + file_column_positions.push(idx); None } else { Some(field_id) @@ -403,21 +403,23 @@ impl ArrowReader { .build()? .map(move |batch| match batch { Ok(batch) => { - let processed_batch = + let mut processed_batch = record_batch_transformer.process_record_batch(batch)?; - // Add the _file column if requested at the correct position - if let Some(position) = file_column_position { - Self::add_file_path_column_ree_at_position( + // Add the _file column at each requested position + // We insert them back at their original positions since we're reconstructing + // the original column order + for &position in &file_column_positions { + processed_batch = Self::add_file_path_column_ree_at_position( processed_batch, &data_file_path, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, position, - ) - } else { - Ok(processed_batch) + )?; } + + Ok(processed_batch) } Err(err) => Err(err.into()), }); diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 21fa1310a..740dab94d 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -2057,63 +2057,93 @@ pub mod tests { } #[tokio::test] - async fn test_select_called_twice_uses_last_selection() { + async fn test_select_with_repeated_column_names() { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - // Call select twice - first with x and y, then with only z and _file - // The second call should override the first + // Select with repeated column names - both regular columns and virtual columns + // Repeated columns should appear multiple times in the result (duplicates are allowed) let table_scan = fixture .table .scan() - .select(["x", "y", RESERVED_COL_NAME_FILE]) // This should be ignored - .select(["z", RESERVED_COL_NAME_FILE]) // This should be used + .select([ + "x", + RESERVED_COL_NAME_FILE, + "x", // x repeated + "y", + RESERVED_COL_NAME_FILE, // _file repeated + "y", // y repeated + ]) + .with_row_selection_enabled(true) .build() .unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - // Verify we have exactly 2 columns: z and _file (NOT x or y) + // Verify we have exactly 6 columns (duplicates are allowed and preserved) assert_eq!( batches[0].num_columns(), - 2, - "Should have exactly 2 columns from the last select" + 6, + "Should have exactly 6 columns with duplicates" ); let schema = batches[0].schema(); - // Verify the columns are z and _file in that order - assert_eq!(schema.field(0).name(), "z", "First column should be z"); + // Verify columns appear in the exact order requested: x, _file, x, y, _file, y + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); assert_eq!( schema.field(1).name(), RESERVED_COL_NAME_FILE, - "Second column should be _file" + "Column 1 should be _file" + ); + assert_eq!( + schema.field(2).name(), + "x", + "Column 2 should be x (duplicate)" + ); + assert_eq!(schema.field(3).name(), "y", "Column 3 should be y"); + assert_eq!( + schema.field(4).name(), + RESERVED_COL_NAME_FILE, + "Column 4 should be _file (duplicate)" + ); + assert_eq!( + schema.field(5).name(), + "y", + "Column 5 should be y (duplicate)" ); - // Verify x and y are NOT present + // Verify all columns have correct data types assert!( - batches[0].column_by_name("x").is_none(), - "Column x should not be present (it was only in the first select)" + matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64), + "Column x should be Int64" ); assert!( - batches[0].column_by_name("y").is_none(), - "Column y should not be present (it was only in the first select)" + matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64), + "Column x (duplicate) should be Int64" + ); + assert!( + matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64), + "Column y should be Int64" + ); + assert!( + matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64), + "Column y (duplicate) should be Int64" ); - - // Verify z column has data - let z_col = batches[0].column_by_name("z").unwrap(); - let z_arr = z_col.as_primitive::(); - assert!(z_arr.len() > 0, "Column z should have data"); - - // Verify _file column exists and is properly formatted - let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE).unwrap(); assert!( matches!( - file_col.data_type(), + schema.field(1).data_type(), arrow_schema::DataType::RunEndEncoded(_, _) ), "_file column should use RunEndEncoded type" ); + assert!( + matches!( + schema.field(4).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column (duplicate) should use RunEndEncoded type" + ); } } From adf0da0022c97660725140dec114b5ca2bf873cf Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Fri, 7 Nov 2025 14:03:33 +0100 Subject: [PATCH 11/14] Refactor into own transformer step --- .../src/arrow/metadata_column_transformer.rs | 282 ++++++++++++++++++ crates/iceberg/src/arrow/mod.rs | 2 + crates/iceberg/src/arrow/reader.rs | 53 ++-- 3 files changed, 301 insertions(+), 36 deletions(-) create mode 100644 crates/iceberg/src/arrow/metadata_column_transformer.rs diff --git a/crates/iceberg/src/arrow/metadata_column_transformer.rs b/crates/iceberg/src/arrow/metadata_column_transformer.rs new file mode 100644 index 000000000..f8a2f70d7 --- /dev/null +++ b/crates/iceberg/src/arrow/metadata_column_transformer.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{Int32Array, RecordBatch, RunArray, StringArray}; +use arrow_schema::{DataType, Field}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::{Error, ErrorKind, Result}; + +/// Represents different types of metadata column transformations that can be applied to a RecordBatch. +/// Each variant encapsulates the data and logic needed for a specific type of metadata column. +#[derive(Debug, Clone)] +pub(crate) enum MetadataColumnTransformation { + /// Adds a _file column with the file path using Run-End Encoding (REE) for memory efficiency. + /// The _file column stores the file path from which each row was read, with REE ensuring + /// that the same file path value is not repeated in memory for every row. + FilePath { + file_path: String, + field_name: String, + field_id: i32, + }, + // Future metadata columns can be added here, e.g.: + // PartitionValue { partition_values: HashMap, ... }, + // RowNumber { start: u64, ... }, +} + +impl MetadataColumnTransformation { + /// Applies the transformation to a RecordBatch, adding the metadata column at the specified position. + /// + /// # Arguments + /// * `batch` - The input RecordBatch to transform + /// * `position` - The position at which to insert the metadata column + /// + /// # Returns + /// A new RecordBatch with the metadata column inserted at the given position + pub(crate) fn apply(&self, batch: RecordBatch, position: usize) -> Result { + match self { + Self::FilePath { + file_path, + field_name, + field_id, + } => Self::apply_file_path(batch, file_path, field_name, *field_id, position), + } + } + + /// Applies the file path transformation using Run-End Encoding. + fn apply_file_path( + batch: RecordBatch, + file_path: &str, + field_name: &str, + field_id: i32, + position: usize, + ) -> Result { + let num_rows = batch.num_rows(); + + // Use Run-End Encoded array for optimal memory efficiency + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + let values = if num_rows == 0 { + StringArray::from(Vec::<&str>::new()) + } else { + StringArray::from(vec![file_path]) + }; + + let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for _file column", + ) + .with_source(e) + })?; + + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + let file_field = Field::new( + field_name, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ); + + Self::insert_column_at_position(batch, Arc::new(file_array), file_field, field_id, position) + } + + /// Inserts a column at the specified position in a RecordBatch. + fn insert_column_at_position( + batch: RecordBatch, + column_array: arrow_array::ArrayRef, + field: Field, + field_id: i32, + position: usize, + ) -> Result { + let field_with_metadata = Arc::new(field.with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + )]))); + + // Build columns vector in a single pass without insert + let original_columns = batch.columns(); + let mut columns = Vec::with_capacity(original_columns.len() + 1); + columns.extend_from_slice(&original_columns[..position]); + columns.push(column_array); + columns.extend_from_slice(&original_columns[position..]); + + // Build fields vector in a single pass without insert + let schema = batch.schema(); + let original_fields = schema.fields(); + let mut fields = Vec::with_capacity(original_fields.len() + 1); + fields.extend(original_fields[..position].iter().cloned()); + fields.push(field_with_metadata); + fields.extend(original_fields[position..].iter().cloned()); + + let schema = Arc::new(arrow_schema::Schema::new(fields)); + RecordBatch::try_new(schema, columns).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to add metadata column to RecordBatch", + ) + .with_source(e) + }) + } +} + +/// Composes multiple metadata column transformations. +/// +/// This allows us to apply multiple metadata columns in sequence, where each transformation +/// builds on the previous one. For example, adding both _file and partition value columns. +pub(crate) struct MetadataTransformer { + transformations: Vec<(MetadataColumnTransformation, usize)>, +} + +impl MetadataTransformer { + /// Creates a new empty MetadataTransformer. + pub(crate) fn new() -> Self { + Self { + transformations: Vec::new(), + } + } + + /// Creates a builder for constructing a MetadataTransformer from projected field IDs. + pub(crate) fn builder(projected_field_ids: Vec) -> MetadataTransformerBuilder { + MetadataTransformerBuilder::new(projected_field_ids) + } + + /// Applies all registered transformations to the given RecordBatch. + /// + /// Transformations are applied in the order they were added. Each transformation + /// inserts a column at its specified position, so later transformations see the + /// updated batch with previously inserted columns. + pub(crate) fn apply(&self, mut batch: RecordBatch) -> Result { + for (transformation, position) in &self.transformations { + batch = transformation.apply(batch, *position)?; + } + Ok(batch) + } + + /// Returns true if there are any transformations to apply. + pub(crate) fn has_transformations(&self) -> bool { + !self.transformations.is_empty() + } +} + +impl Default for MetadataTransformer { + fn default() -> Self { + Self::new() + } +} + +/// Builder for constructing a MetadataTransformer from projected field IDs. +/// +/// This builder analyzes projected field IDs to identify metadata columns (reserved fields) +/// and builds the appropriate transformations. Reserved fields have special handling and +/// are inserted into the RecordBatch at their projected positions. +pub(crate) struct MetadataTransformerBuilder { + projected_field_ids: Vec, + file_path: Option, +} + +impl MetadataTransformerBuilder { + /// Creates a new MetadataTransformerBuilder for the given projected field IDs. + /// + /// # Arguments + /// * `projected_field_ids` - The list of field IDs being projected, including any reserved fields + pub(crate) fn new(projected_field_ids: Vec) -> Self { + Self { + projected_field_ids, + file_path: None, + } + } + + /// Sets the file path for the _file metadata column. + /// + /// # Arguments + /// * `file_path` - The file path to use for the _file column + /// + /// # Returns + /// Self for method chaining + pub(crate) fn with_file_path(mut self, file_path: String) -> Self { + self.file_path = Some(file_path); + self + } + + /// Builds the MetadataTransformer by analyzing projected field IDs and creating appropriate transformations. + /// + /// This method: + /// 1. Iterates through projected field IDs to find reserved fields + /// 2. Calculates the correct position for each reserved field in the final output + /// 3. Creates transformations for each reserved field found + pub(crate) fn build(self) -> MetadataTransformer { + let mut transformations = Vec::new(); + + // Iterate through the projected field IDs and check for reserved fields + for (position, field_id) in self.projected_field_ids.iter().enumerate() { + // Check if this is a reserved field ID for the _file column + if *field_id == RESERVED_FIELD_ID_FILE { + if let Some(ref path) = self.file_path { + let transformation = MetadataColumnTransformation::FilePath { + file_path: path.clone(), + field_name: RESERVED_COL_NAME_FILE.to_string(), + field_id: *field_id, + }; + transformations.push((transformation, position)); + } + } + // Additional reserved fields can be handled here in the future + } + + MetadataTransformer { transformations } + } + + /// Returns the projected field IDs with virtual/reserved fields filtered out. + /// + /// This is used to determine which regular (non-virtual) fields should be read from the data file. + /// Virtual fields are handled by the metadata transformer and should not be included in the + /// Parquet projection. + pub(crate) fn project_field_ids_without_virtual(&self) -> Vec { + self.projected_field_ids + .iter() + .filter(|&&field_id| !Self::is_reserved_field(field_id)) + .copied() + .collect() + } + + /// Checks if a field ID is reserved (virtual). + fn is_reserved_field(field_id: i32) -> bool { + field_id == RESERVED_FIELD_ID_FILE + // Additional reserved fields can be checked here + } +} + +impl Default for MetadataTransformerBuilder { + fn default() -> Self { + Self::new(Vec::new()) + } +} + +// Reserved field IDs and names for metadata columns + +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; + +/// Reserved column name for the file path metadata column +pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c4517..feb8f3ac4 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -27,6 +27,8 @@ pub(crate) mod caching_delete_file_loader; pub mod delete_file_loader; pub(crate) mod delete_filter; +pub(crate) mod metadata_column_transformer; +pub(crate) use metadata_column_transformer::{RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE}; mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 8b6524971..d834eb9fc 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -48,6 +48,7 @@ use parquet::file::metadata::{ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::metadata_column_transformer::MetadataTransformer; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -62,12 +63,6 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; -/// Reserved field ID for the file path (_file) column per Iceberg spec -pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; - -/// Column name for the file path metadata column per Iceberg spec -pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; - /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -228,21 +223,18 @@ impl ArrowReader { initial_stream_builder }; - // Check if _file column is requested and filter it out for projection - let mut file_column_positions = Vec::new(); - let project_field_ids_without_virtual: Vec = task - .project_field_ids - .iter() - .enumerate() - .filter_map(|(idx, &field_id)| { - if field_id == RESERVED_FIELD_ID_FILE { - file_column_positions.push(idx); - None - } else { - Some(field_id) - } - }) - .collect(); + // Build the metadata transformer from the projected field IDs + // This identifies reserved fields (like _file) and creates transformations for them + let metadata_transformer_builder = + MetadataTransformer::builder(task.project_field_ids.clone()) + .with_file_path(task.data_file_path.clone()); + + // Get the field IDs without virtual fields for Parquet projection + let project_field_ids_without_virtual = + metadata_transformer_builder.project_field_ids_without_virtual(); + + // Build the metadata transformer (which will handle adding _file columns) + let metadata_transformer = metadata_transformer_builder.build(); // Fallback IDs don't match Parquet's embedded field IDs (since they don't exist), // so we must use position-based projection instead of field-ID matching @@ -403,23 +395,12 @@ impl ArrowReader { .build()? .map(move |batch| match batch { Ok(batch) => { - let mut processed_batch = + // First apply record batch transformations (type promotion, column reordering, etc.) + let processed_batch = record_batch_transformer.process_record_batch(batch)?; - // Add the _file column at each requested position - // We insert them back at their original positions since we're reconstructing - // the original column order - for &position in &file_column_positions { - processed_batch = Self::add_file_path_column_ree_at_position( - processed_batch, - &data_file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - position, - )?; - } - - Ok(processed_batch) + // Then apply metadata transformations (add _file column, etc.) + metadata_transformer.apply(processed_batch) } Err(err) => Err(err.into()), }); From ef3a965e98b418cc3ef722f99f3d1e29f6ec6a85 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Fri, 7 Nov 2025 15:04:35 +0100 Subject: [PATCH 12/14] Revert "Refactor into own transformer step" This reverts commit adf0da0022c97660725140dec114b5ca2bf873cf. --- .../src/arrow/metadata_column_transformer.rs | 282 ------------------ crates/iceberg/src/arrow/mod.rs | 2 - crates/iceberg/src/arrow/reader.rs | 53 ++-- 3 files changed, 36 insertions(+), 301 deletions(-) delete mode 100644 crates/iceberg/src/arrow/metadata_column_transformer.rs diff --git a/crates/iceberg/src/arrow/metadata_column_transformer.rs b/crates/iceberg/src/arrow/metadata_column_transformer.rs deleted file mode 100644 index f8a2f70d7..000000000 --- a/crates/iceberg/src/arrow/metadata_column_transformer.rs +++ /dev/null @@ -1,282 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use arrow_array::{Int32Array, RecordBatch, RunArray, StringArray}; -use arrow_schema::{DataType, Field}; -use parquet::arrow::PARQUET_FIELD_ID_META_KEY; - -use crate::{Error, ErrorKind, Result}; - -/// Represents different types of metadata column transformations that can be applied to a RecordBatch. -/// Each variant encapsulates the data and logic needed for a specific type of metadata column. -#[derive(Debug, Clone)] -pub(crate) enum MetadataColumnTransformation { - /// Adds a _file column with the file path using Run-End Encoding (REE) for memory efficiency. - /// The _file column stores the file path from which each row was read, with REE ensuring - /// that the same file path value is not repeated in memory for every row. - FilePath { - file_path: String, - field_name: String, - field_id: i32, - }, - // Future metadata columns can be added here, e.g.: - // PartitionValue { partition_values: HashMap, ... }, - // RowNumber { start: u64, ... }, -} - -impl MetadataColumnTransformation { - /// Applies the transformation to a RecordBatch, adding the metadata column at the specified position. - /// - /// # Arguments - /// * `batch` - The input RecordBatch to transform - /// * `position` - The position at which to insert the metadata column - /// - /// # Returns - /// A new RecordBatch with the metadata column inserted at the given position - pub(crate) fn apply(&self, batch: RecordBatch, position: usize) -> Result { - match self { - Self::FilePath { - file_path, - field_name, - field_id, - } => Self::apply_file_path(batch, file_path, field_name, *field_id, position), - } - } - - /// Applies the file path transformation using Run-End Encoding. - fn apply_file_path( - batch: RecordBatch, - file_path: &str, - field_name: &str, - field_id: i32, - position: usize, - ) -> Result { - let num_rows = batch.num_rows(); - - // Use Run-End Encoded array for optimal memory efficiency - let run_ends = if num_rows == 0 { - Int32Array::from(Vec::::new()) - } else { - Int32Array::from(vec![num_rows as i32]) - }; - let values = if num_rows == 0 { - StringArray::from(Vec::<&str>::new()) - } else { - StringArray::from(vec![file_path]) - }; - - let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to create RunArray for _file column", - ) - .with_source(e) - })?; - - let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); - let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); - let file_field = Field::new( - field_name, - DataType::RunEndEncoded(run_ends_field, values_field), - false, - ); - - Self::insert_column_at_position(batch, Arc::new(file_array), file_field, field_id, position) - } - - /// Inserts a column at the specified position in a RecordBatch. - fn insert_column_at_position( - batch: RecordBatch, - column_array: arrow_array::ArrayRef, - field: Field, - field_id: i32, - position: usize, - ) -> Result { - let field_with_metadata = Arc::new(field.with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - field_id.to_string(), - )]))); - - // Build columns vector in a single pass without insert - let original_columns = batch.columns(); - let mut columns = Vec::with_capacity(original_columns.len() + 1); - columns.extend_from_slice(&original_columns[..position]); - columns.push(column_array); - columns.extend_from_slice(&original_columns[position..]); - - // Build fields vector in a single pass without insert - let schema = batch.schema(); - let original_fields = schema.fields(); - let mut fields = Vec::with_capacity(original_fields.len() + 1); - fields.extend(original_fields[..position].iter().cloned()); - fields.push(field_with_metadata); - fields.extend(original_fields[position..].iter().cloned()); - - let schema = Arc::new(arrow_schema::Schema::new(fields)); - RecordBatch::try_new(schema, columns).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to add metadata column to RecordBatch", - ) - .with_source(e) - }) - } -} - -/// Composes multiple metadata column transformations. -/// -/// This allows us to apply multiple metadata columns in sequence, where each transformation -/// builds on the previous one. For example, adding both _file and partition value columns. -pub(crate) struct MetadataTransformer { - transformations: Vec<(MetadataColumnTransformation, usize)>, -} - -impl MetadataTransformer { - /// Creates a new empty MetadataTransformer. - pub(crate) fn new() -> Self { - Self { - transformations: Vec::new(), - } - } - - /// Creates a builder for constructing a MetadataTransformer from projected field IDs. - pub(crate) fn builder(projected_field_ids: Vec) -> MetadataTransformerBuilder { - MetadataTransformerBuilder::new(projected_field_ids) - } - - /// Applies all registered transformations to the given RecordBatch. - /// - /// Transformations are applied in the order they were added. Each transformation - /// inserts a column at its specified position, so later transformations see the - /// updated batch with previously inserted columns. - pub(crate) fn apply(&self, mut batch: RecordBatch) -> Result { - for (transformation, position) in &self.transformations { - batch = transformation.apply(batch, *position)?; - } - Ok(batch) - } - - /// Returns true if there are any transformations to apply. - pub(crate) fn has_transformations(&self) -> bool { - !self.transformations.is_empty() - } -} - -impl Default for MetadataTransformer { - fn default() -> Self { - Self::new() - } -} - -/// Builder for constructing a MetadataTransformer from projected field IDs. -/// -/// This builder analyzes projected field IDs to identify metadata columns (reserved fields) -/// and builds the appropriate transformations. Reserved fields have special handling and -/// are inserted into the RecordBatch at their projected positions. -pub(crate) struct MetadataTransformerBuilder { - projected_field_ids: Vec, - file_path: Option, -} - -impl MetadataTransformerBuilder { - /// Creates a new MetadataTransformerBuilder for the given projected field IDs. - /// - /// # Arguments - /// * `projected_field_ids` - The list of field IDs being projected, including any reserved fields - pub(crate) fn new(projected_field_ids: Vec) -> Self { - Self { - projected_field_ids, - file_path: None, - } - } - - /// Sets the file path for the _file metadata column. - /// - /// # Arguments - /// * `file_path` - The file path to use for the _file column - /// - /// # Returns - /// Self for method chaining - pub(crate) fn with_file_path(mut self, file_path: String) -> Self { - self.file_path = Some(file_path); - self - } - - /// Builds the MetadataTransformer by analyzing projected field IDs and creating appropriate transformations. - /// - /// This method: - /// 1. Iterates through projected field IDs to find reserved fields - /// 2. Calculates the correct position for each reserved field in the final output - /// 3. Creates transformations for each reserved field found - pub(crate) fn build(self) -> MetadataTransformer { - let mut transformations = Vec::new(); - - // Iterate through the projected field IDs and check for reserved fields - for (position, field_id) in self.projected_field_ids.iter().enumerate() { - // Check if this is a reserved field ID for the _file column - if *field_id == RESERVED_FIELD_ID_FILE { - if let Some(ref path) = self.file_path { - let transformation = MetadataColumnTransformation::FilePath { - file_path: path.clone(), - field_name: RESERVED_COL_NAME_FILE.to_string(), - field_id: *field_id, - }; - transformations.push((transformation, position)); - } - } - // Additional reserved fields can be handled here in the future - } - - MetadataTransformer { transformations } - } - - /// Returns the projected field IDs with virtual/reserved fields filtered out. - /// - /// This is used to determine which regular (non-virtual) fields should be read from the data file. - /// Virtual fields are handled by the metadata transformer and should not be included in the - /// Parquet projection. - pub(crate) fn project_field_ids_without_virtual(&self) -> Vec { - self.projected_field_ids - .iter() - .filter(|&&field_id| !Self::is_reserved_field(field_id)) - .copied() - .collect() - } - - /// Checks if a field ID is reserved (virtual). - fn is_reserved_field(field_id: i32) -> bool { - field_id == RESERVED_FIELD_ID_FILE - // Additional reserved fields can be checked here - } -} - -impl Default for MetadataTransformerBuilder { - fn default() -> Self { - Self::new(Vec::new()) - } -} - -// Reserved field IDs and names for metadata columns - -/// Reserved field ID for the file path (_file) column per Iceberg spec -pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; - -/// Reserved column name for the file path metadata column -pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index feb8f3ac4..c091c4517 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -27,8 +27,6 @@ pub(crate) mod caching_delete_file_loader; pub mod delete_file_loader; pub(crate) mod delete_filter; -pub(crate) mod metadata_column_transformer; -pub(crate) use metadata_column_transformer::{RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE}; mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index d834eb9fc..8b6524971 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -48,7 +48,6 @@ use parquet::file::metadata::{ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; -use crate::arrow::metadata_column_transformer::MetadataTransformer; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -63,6 +62,12 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; + +/// Column name for the file path metadata column per Iceberg spec +pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -223,18 +228,21 @@ impl ArrowReader { initial_stream_builder }; - // Build the metadata transformer from the projected field IDs - // This identifies reserved fields (like _file) and creates transformations for them - let metadata_transformer_builder = - MetadataTransformer::builder(task.project_field_ids.clone()) - .with_file_path(task.data_file_path.clone()); - - // Get the field IDs without virtual fields for Parquet projection - let project_field_ids_without_virtual = - metadata_transformer_builder.project_field_ids_without_virtual(); - - // Build the metadata transformer (which will handle adding _file columns) - let metadata_transformer = metadata_transformer_builder.build(); + // Check if _file column is requested and filter it out for projection + let mut file_column_positions = Vec::new(); + let project_field_ids_without_virtual: Vec = task + .project_field_ids + .iter() + .enumerate() + .filter_map(|(idx, &field_id)| { + if field_id == RESERVED_FIELD_ID_FILE { + file_column_positions.push(idx); + None + } else { + Some(field_id) + } + }) + .collect(); // Fallback IDs don't match Parquet's embedded field IDs (since they don't exist), // so we must use position-based projection instead of field-ID matching @@ -395,12 +403,23 @@ impl ArrowReader { .build()? .map(move |batch| match batch { Ok(batch) => { - // First apply record batch transformations (type promotion, column reordering, etc.) - let processed_batch = + let mut processed_batch = record_batch_transformer.process_record_batch(batch)?; - // Then apply metadata transformations (add _file column, etc.) - metadata_transformer.apply(processed_batch) + // Add the _file column at each requested position + // We insert them back at their original positions since we're reconstructing + // the original column order + for &position in &file_column_positions { + processed_batch = Self::add_file_path_column_ree_at_position( + processed_batch, + &data_file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + position, + )?; + } + + Ok(processed_batch) } Err(err) => Err(err.into()), }); From 534490bb8239a87e0cced20529b73018702debbe Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Fri, 7 Nov 2025 15:08:42 +0100 Subject: [PATCH 13/14] Avoid special casing in batch creation --- crates/iceberg/src/arrow/reader.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 8b6524971..d3dde7eaf 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -229,14 +229,12 @@ impl ArrowReader { }; // Check if _file column is requested and filter it out for projection - let mut file_column_positions = Vec::new(); let project_field_ids_without_virtual: Vec = task .project_field_ids .iter() .enumerate() .filter_map(|(idx, &field_id)| { if field_id == RESERVED_FIELD_ID_FILE { - file_column_positions.push(idx); None } else { Some(field_id) @@ -409,14 +407,16 @@ impl ArrowReader { // Add the _file column at each requested position // We insert them back at their original positions since we're reconstructing // the original column order - for &position in &file_column_positions { - processed_batch = Self::add_file_path_column_ree_at_position( - processed_batch, - &data_file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - position, - )?; + for (position, field_id) in task.project_field_ids.iter().enumerate() { + if *field_id == RESERVED_FIELD_ID_FILE { + processed_batch = Self::add_file_path_column_ree_at_position( + processed_batch, + &data_file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + position, + )?; + } } Ok(processed_batch) From 04bf463623e12a3f1b26cb7576952c30e48d64d6 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Fri, 7 Nov 2025 15:09:25 +0100 Subject: [PATCH 14/14] . --- crates/iceberg/src/arrow/reader.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index d3dde7eaf..064e0c621 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -232,8 +232,7 @@ impl ArrowReader { let project_field_ids_without_virtual: Vec = task .project_field_ids .iter() - .enumerate() - .filter_map(|(idx, &field_id)| { + .filter_map(|&field_id| { if field_id == RESERVED_FIELD_ID_FILE { None } else {