Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 248 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,16 @@ impl ArrowReader {
}
};

// There are two possible sources both for potential lists of selected RowGroup indices,
// and for `RowSelection`s.
// Selected RowGroup index lists can come from two sources:
// There are three possible sources for potential lists of selected RowGroup indices,
// and two for `RowSelection`s.
// Selected RowGroup index lists can come from three sources:
// * When task.start and task.length specify a byte range (file splitting);
// * When there are equality delete files that are applicable;
// * When there is a scan predicate and row_group_filtering_enabled = true.
// `RowSelection`s can be created in either or both of the following cases:
// * When there are positional delete files that are applicable;
// * When there is a scan predicate and row_selection_enabled = true
// Note that, in the former case we only perform row group filtering when
// Note that row group filtering from predicates only happens when
// there is a scan predicate AND row_group_filtering_enabled = true,
// but we perform row selection filtering if there are applicable
// equality delete files OR (there is a scan predicate AND row_selection_enabled),
Expand All @@ -241,6 +242,17 @@ impl ArrowReader {
let mut selected_row_group_indices = None;
let mut row_selection = None;

// Filter row groups based on byte range from task.start and task.length.
// If both start and length are 0, read the entire file (backwards compatibility).
if task.start != 0 || task.length != 0 {
let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
record_batch_stream_builder.metadata(),
task.start,
task.length,
)?;
selected_row_group_indices = Some(byte_range_filtered_row_groups);
}

if let Some(predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
Expand All @@ -256,14 +268,26 @@ impl ArrowReader {
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);

if row_group_filtering_enabled {
let result = Self::get_selected_row_group_indices(
let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
&predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
&task.schema,
)?;

selected_row_group_indices = Some(result);
// Merge predicate-based filtering with byte range filtering (if present)
// by taking the intersection of both filters
selected_row_group_indices = match selected_row_group_indices {
Some(byte_range_filtered) => {
// Keep only row groups that are in both filters
let intersection: Vec<usize> = byte_range_filtered
.into_iter()
.filter(|idx| predicate_filtered_row_groups.contains(idx))
.collect();
Some(intersection)
}
None => Some(predicate_filtered_row_groups),
};
}

if row_selection_enabled {
Expand Down Expand Up @@ -717,6 +741,36 @@ impl ArrowReader {

Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}

/// Filters row groups by byte range to support Iceberg's file splitting.
///
/// Iceberg splits large files at row group boundaries, so we only read row groups
/// whose byte ranges overlap with [start, start+length).
fn filter_row_groups_by_byte_range(
parquet_metadata: &Arc<ParquetMetaData>,
start: u64,
length: u64,
) -> Result<Vec<usize>> {
let row_groups = parquet_metadata.row_groups();
let mut selected = Vec::new();
let end = start + length;

// Row groups are stored sequentially after the 4-byte magic header.
let mut current_byte_offset = 4u64;

for (idx, row_group) in row_groups.iter().enumerate() {
let row_group_size = row_group.compressed_size() as u64;
let row_group_end = current_byte_offset + row_group_size;

if current_byte_offset < end && start < row_group_end {
selected.push(idx);
}

current_byte_offset = row_group_end;
}

Ok(selected)
}
}

/// Build the map of parquet field id to Parquet column index in the schema.
Expand Down Expand Up @@ -1949,6 +2003,194 @@ message schema {
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

/// Verifies that file splits respect byte ranges and only read specific row groups.
#[tokio::test]
async fn test_file_splits_respect_byte_ranges() {
use arrow_array::Int32Array;
use parquet::file::reader::{FileReader, SerializedFileReader};

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_path = format!("{}/multi_row_group.parquet", &table_location);

// Force each batch into its own row group for testing byte range filtering.
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
(0..100).collect::<Vec<i32>>(),
))])
.unwrap();
let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
(100..200).collect::<Vec<i32>>(),
))])
.unwrap();
let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
(200..300).collect::<Vec<i32>>(),
))])
.unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(100)
.build();

let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
writer.write(&batch1).expect("Writing batch 1");
writer.write(&batch2).expect("Writing batch 2");
writer.write(&batch3).expect("Writing batch 3");
writer.close().unwrap();

// Read the file metadata to get row group byte positions
let file = File::open(&file_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();

println!("File has {} row groups", metadata.num_row_groups());
assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");

// Get byte positions for each row group
let row_group_0 = metadata.row_group(0);
let row_group_1 = metadata.row_group(1);
let row_group_2 = metadata.row_group(2);

let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
let file_end = rg2_start + row_group_2.compressed_size() as u64;

println!(
"Row group 0: {} rows, starts at byte {}, {} bytes compressed",
row_group_0.num_rows(),
rg0_start,
row_group_0.compressed_size()
);
println!(
"Row group 1: {} rows, starts at byte {}, {} bytes compressed",
row_group_1.num_rows(),
rg1_start,
row_group_1.compressed_size()
);
println!(
"Row group 2: {} rows, starts at byte {}, {} bytes compressed",
row_group_2.num_rows(),
rg2_start,
row_group_2.compressed_size()
);

let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();

// Task 1: read only the first row group
let task1 = FileScanTask {
start: rg0_start,
length: row_group_0.compressed_size() as u64,
record_count: Some(100),
data_file_path: file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
};

// Task 2: read the second and third row groups
let task2 = FileScanTask {
start: rg1_start,
length: file_end - rg1_start,
record_count: Some(200),
data_file_path: file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
};

let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
let result1 = reader
.clone()
.read(tasks1)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
println!(
"Task 1 (bytes {}-{}) returned {} rows",
rg0_start,
rg0_start + row_group_0.compressed_size() as u64,
total_rows_task1
);

let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
let result2 = reader
.read(tasks2)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
println!(
"Task 2 (bytes {}-{}) returned {} rows",
rg1_start, file_end, total_rows_task2
);

assert_eq!(
total_rows_task1, 100,
"Task 1 should read only the first row group (100 rows), but got {} rows",
total_rows_task1
);

assert_eq!(
total_rows_task2, 200,
"Task 2 should read only the second+third row groups (200 rows), but got {} rows",
total_rows_task2
);

// Verify the actual data values are correct (not just the row count)
if total_rows_task1 > 0 {
let first_batch = &result1[0];
let id_col = first_batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let first_val = id_col.value(0);
let last_val = id_col.value(id_col.len() - 1);
println!("Task 1 data range: {} to {}", first_val, last_val);

assert_eq!(first_val, 0, "Task 1 should start with id=0");
assert_eq!(last_val, 99, "Task 1 should end with id=99");
}

if total_rows_task2 > 0 {
let first_batch = &result2[0];
let id_col = first_batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let first_val = id_col.value(0);
println!("Task 2 first value: {}", first_val);

assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
}
}

/// Test schema evolution: reading old Parquet file (with only column 'a')
/// using a newer table schema (with columns 'a' and 'b').
/// This tests that:
Expand Down