diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ff4cff0a64..720b9363b9 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -224,15 +224,16 @@ impl ArrowReader { } }; - // There are two possible sources both for potential lists of selected RowGroup indices, - // and for `RowSelection`s. - // Selected RowGroup index lists can come from two sources: + // There are three possible sources for potential lists of selected RowGroup indices, + // and two for `RowSelection`s. + // Selected RowGroup index lists can come from three sources: + // * When task.start and task.length specify a byte range (file splitting); // * When there are equality delete files that are applicable; // * When there is a scan predicate and row_group_filtering_enabled = true. // `RowSelection`s can be created in either or both of the following cases: // * When there are positional delete files that are applicable; // * When there is a scan predicate and row_selection_enabled = true - // Note that, in the former case we only perform row group filtering when + // Note that row group filtering from predicates only happens when // there is a scan predicate AND row_group_filtering_enabled = true, // but we perform row selection filtering if there are applicable // equality delete files OR (there is a scan predicate AND row_selection_enabled), @@ -241,6 +242,17 @@ impl ArrowReader { let mut selected_row_group_indices = None; let mut row_selection = None; + // Filter row groups based on byte range from task.start and task.length. + // If both start and length are 0, read the entire file (backwards compatibility). + if task.start != 0 || task.length != 0 { + let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range( + record_batch_stream_builder.metadata(), + task.start, + task.length, + )?; + selected_row_group_indices = Some(byte_range_filtered_row_groups); + } + if let Some(predicate) = final_predicate { let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), @@ -256,14 +268,26 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter); if row_group_filtering_enabled { - let result = Self::get_selected_row_group_indices( + let predicate_filtered_row_groups = Self::get_selected_row_group_indices( &predicate, record_batch_stream_builder.metadata(), &field_id_map, &task.schema, )?; - selected_row_group_indices = Some(result); + // Merge predicate-based filtering with byte range filtering (if present) + // by taking the intersection of both filters + selected_row_group_indices = match selected_row_group_indices { + Some(byte_range_filtered) => { + // Keep only row groups that are in both filters + let intersection: Vec = byte_range_filtered + .into_iter() + .filter(|idx| predicate_filtered_row_groups.contains(idx)) + .collect(); + Some(intersection) + } + None => Some(predicate_filtered_row_groups), + }; } if row_selection_enabled { @@ -717,6 +741,36 @@ impl ArrowReader { Ok(results.into_iter().flatten().collect::>().into()) } + + /// Filters row groups by byte range to support Iceberg's file splitting. + /// + /// Iceberg splits large files at row group boundaries, so we only read row groups + /// whose byte ranges overlap with [start, start+length). + fn filter_row_groups_by_byte_range( + parquet_metadata: &Arc, + start: u64, + length: u64, + ) -> Result> { + let row_groups = parquet_metadata.row_groups(); + let mut selected = Vec::new(); + let end = start + length; + + // Row groups are stored sequentially after the 4-byte magic header. + let mut current_byte_offset = 4u64; + + for (idx, row_group) in row_groups.iter().enumerate() { + let row_group_size = row_group.compressed_size() as u64; + let row_group_end = current_byte_offset + row_group_size; + + if current_byte_offset < end && start < row_group_end { + selected.push(idx); + } + + current_byte_offset = row_group_end; + } + + Ok(selected) + } } /// Build the map of parquet field id to Parquet column index in the schema. @@ -1949,6 +2003,194 @@ message schema { Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + /// Verifies that file splits respect byte ranges and only read specific row groups. + #[tokio::test] + async fn test_file_splits_respect_byte_ranges() { + use arrow_array::Int32Array; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{}/multi_row_group.parquet", &table_location); + + // Force each batch into its own row group for testing byte range filtering. + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from( + (0..100).collect::>(), + ))]) + .unwrap(); + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from( + (100..200).collect::>(), + ))]) + .unwrap(); + let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from( + (200..300).collect::>(), + ))]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.write(&batch3).expect("Writing batch 3"); + writer.close().unwrap(); + + // Read the file metadata to get row group byte positions + let file = File::open(&file_path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + + println!("File has {} row groups", metadata.num_row_groups()); + assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups"); + + // Get byte positions for each row group + let row_group_0 = metadata.row_group(0); + let row_group_1 = metadata.row_group(1); + let row_group_2 = metadata.row_group(2); + + let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1" + let rg1_start = rg0_start + row_group_0.compressed_size() as u64; + let rg2_start = rg1_start + row_group_1.compressed_size() as u64; + let file_end = rg2_start + row_group_2.compressed_size() as u64; + + println!( + "Row group 0: {} rows, starts at byte {}, {} bytes compressed", + row_group_0.num_rows(), + rg0_start, + row_group_0.compressed_size() + ); + println!( + "Row group 1: {} rows, starts at byte {}, {} bytes compressed", + row_group_1.num_rows(), + rg1_start, + row_group_1.compressed_size() + ); + println!( + "Row group 2: {} rows, starts at byte {}, {} bytes compressed", + row_group_2.num_rows(), + rg2_start, + row_group_2.compressed_size() + ); + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + // Task 1: read only the first row group + let task1 = FileScanTask { + start: rg0_start, + length: row_group_0.compressed_size() as u64, + record_count: Some(100), + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![], + }; + + // Task 2: read the second and third row groups + let task2 = FileScanTask { + start: rg1_start, + length: file_end - rg1_start, + record_count: Some(200), + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![], + }; + + let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; + let result1 = reader + .clone() + .read(tasks1) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum(); + println!( + "Task 1 (bytes {}-{}) returned {} rows", + rg0_start, + rg0_start + row_group_0.compressed_size() as u64, + total_rows_task1 + ); + + let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream; + let result2 = reader + .read(tasks2) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum(); + println!( + "Task 2 (bytes {}-{}) returned {} rows", + rg1_start, file_end, total_rows_task2 + ); + + assert_eq!( + total_rows_task1, 100, + "Task 1 should read only the first row group (100 rows), but got {} rows", + total_rows_task1 + ); + + assert_eq!( + total_rows_task2, 200, + "Task 2 should read only the second+third row groups (200 rows), but got {} rows", + total_rows_task2 + ); + + // Verify the actual data values are correct (not just the row count) + if total_rows_task1 > 0 { + let first_batch = &result1[0]; + let id_col = first_batch + .column(0) + .as_primitive::(); + let first_val = id_col.value(0); + let last_val = id_col.value(id_col.len() - 1); + println!("Task 1 data range: {} to {}", first_val, last_val); + + assert_eq!(first_val, 0, "Task 1 should start with id=0"); + assert_eq!(last_val, 99, "Task 1 should end with id=99"); + } + + if total_rows_task2 > 0 { + let first_batch = &result2[0]; + let id_col = first_batch + .column(0) + .as_primitive::(); + let first_val = id_col.value(0); + println!("Task 2 first value: {}", first_val); + + assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0"); + } + } + /// Test schema evolution: reading old Parquet file (with only column 'a') /// using a newer table schema (with columns 'a' and 'b'). /// This tests that: