From 20058222861bf56334aba1bad30e113af81d3f06 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 30 Oct 2025 15:15:54 -0400 Subject: [PATCH 1/5] Fix position delete with multiple row groups. --- crates/iceberg/src/arrow/reader.rs | 191 ++++++++++++++++++++++++++++- 1 file changed, 189 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 3c18c0337b..8e61feeacc 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -424,6 +424,7 @@ impl ArrowReader { // the remainder of this row group and skip to the next row group if next_deleted_row_idx >= next_row_group_base_idx { results.push(RowSelector::select(row_group_num_rows as usize)); + current_row_group_base_idx += row_group_num_rows; continue; } @@ -433,6 +434,7 @@ impl ArrowReader { // If there are no more pos deletes, add a selector for the entirety of this row group. _ => { results.push(RowSelector::select(row_group_num_rows as usize)); + current_row_group_base_idx += row_group_num_rows; continue; } }; @@ -1496,8 +1498,10 @@ mod tests { use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; use crate::io::FileIO; - use crate::scan::{FileScanTask, FileScanTaskStream}; - use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type}; + use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; + use crate::spec::{ + DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type, + }; fn table_schema_simple() -> SchemaRef { Arc::new( @@ -2280,4 +2284,187 @@ message schema { assert!(col_b.is_null(1)); assert!(col_b.is_null(2)); } + + /// Test for bug where position deletes in later row groups are not applied correctly. + /// + /// When a file has multiple row groups and a position delete targets a row in a later + /// row group, the `build_deletes_row_selection` function had a bug where it would + /// fail to increment `current_row_group_base_idx` when skipping row groups. + /// + /// This test creates: + /// - A data file with 200 rows split into 2 row groups (0-99, 100-199) + /// - A position delete file that deletes row 199 (last row in second row group) + /// + /// Expected behavior: Should return 199 rows (with id=200 deleted) + /// Bug behavior: Returns 200 rows (delete is not applied) + /// + /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests + /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug: + /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet` + /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet` + #[tokio::test] + async fn test_position_delete_across_multiple_row_groups() { + use arrow_array::{Int32Array, Int64Array}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Field IDs for positional delete schema + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + // Create table schema with a single 'id' column + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Step 1: Create data file with 200 rows in 2 row groups + // Row group 0: rows 0-99 (ids 1-100) + // Row group 1: rows 100-199 (ids 101-200) + let data_file_path = format!("{}/data.parquet", &table_location); + + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(1..=100), + )]) + .unwrap(); + + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(101..=200), + )]) + .unwrap(); + + // Force each batch into its own row group + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&data_file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.close().unwrap(); + + // Verify we created 2 row groups + let verify_file = File::open(&data_file_path).unwrap(); + let verify_reader = SerializedFileReader::new(verify_file).unwrap(); + assert_eq!( + verify_reader.metadata().num_row_groups(), + 2, + "Should have 2 row groups" + ); + + // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1) + let delete_file_path = format!("{}/deletes.parquet", &table_location); + + let delete_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )])), + ])); + + // Delete row at position 199 (0-indexed, so it's the last row: id=200) + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![199i64])), + ]) + .unwrap(); + + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + delete_writer.write(&delete_batch).unwrap(); + delete_writer.close().unwrap(); + + // Step 3: Read the data file with the delete applied + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + let task = FileScanTask { + start: 0, + length: 0, + record_count: Some(200), + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: delete_file_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Step 4: Verify we got 199 rows (not 200) + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + println!("Total rows read: {}", total_rows); + println!("Expected: 199 rows (deleted row 199 which had id=200)"); + + // This assertion will FAIL before the fix and PASS after the fix + assert_eq!( + total_rows, 199, + "Expected 199 rows after deleting row 199, but got {} rows. \ + The bug causes position deletes in later row groups to be ignored.", + total_rows + ); + + // Verify the deleted row (id=200) is not present + let all_ids: Vec = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_primitive::() + .values() + .iter() + .copied() + }) + .collect(); + + assert!( + !all_ids.contains(&200), + "Row with id=200 should be deleted but was found in results" + ); + + // Verify we have all other ids (1-199) + let expected_ids: Vec = (1..=199).collect(); + assert_eq!( + all_ids, expected_ids, + "Should have ids 1-199 but got different values" + ); + } } From c1a6359d7a6e2e3cf1a6eb103c6c677f71fc63f7 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 30 Oct 2025 16:54:39 -0400 Subject: [PATCH 2/5] Fix position delete with skipping row groups. --- crates/iceberg/src/arrow/reader.rs | 238 ++++++++++++++++++++++++++++- 1 file changed, 234 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 8e61feeacc..79a8823477 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -406,10 +406,11 @@ impl ArrowReader { // selected row group selected_row_groups_idx += 1; } else { - // remove any positional deletes from the skipped page so that - // `positional.deletes.min()` can be used - delete_vector_iter.advance_to(next_row_group_base_idx); - next_deleted_row_idx_opt = delete_vector_iter.next(); + // Advance iterator past all deletes in the skipped row group. + // Note: advance_to() positions the iterator without consuming elements, + // so we must NOT call next() here or we'll incorrectly consume a delete + // that belongs to a later selected row group. + delete_vector_iter.advance_to(next_row_group_base_idx);\ // still increment the current page base index but then skip to the next row group // in the file @@ -2467,4 +2468,233 @@ message schema { "Should have ids 1-199 but got different values" ); } + + /// Test for bug where position deletes are lost when skipping unselected row groups. + /// + /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises + /// the row group selection code path (`selected_row_groups: Some([...])`). + /// + /// When a file has multiple row groups and only some are selected for reading, + /// the `build_deletes_row_selection` function must correctly skip over deletes in + /// unselected row groups WITHOUT consuming deletes that belong to selected row groups. + /// + /// This test creates: + /// - A data file with 200 rows split into 2 row groups (0-99, 100-199) + /// - A position delete file that deletes row 199 (last row in second row group) + /// - Row group selection that reads ONLY row group 1 (rows 100-199) + /// + /// Expected behavior: Should return 99 rows (with row 199 deleted) + /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0) + /// + /// The bug occurs when processing row group 0 (unselected): + /// ```rust + /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100 + /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199! + /// ``` + /// + /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups, + /// because `advance_to()` already positions the iterator correctly without consuming elements. + #[tokio::test] + async fn test_position_delete_with_row_group_selection() { + use arrow_array::{Int32Array, Int64Array}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Field IDs for positional delete schema + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + // Create table schema with a single 'id' column + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()), + ])), + ])); + + // Step 1: Create data file with 200 rows in 2 row groups + // Row group 0: rows 0-99 (ids 1-100) + // Row group 1: rows 100-199 (ids 101-200) + let data_file_path = format!("{}/data.parquet", &table_location); + + let batch1 = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(1..=100))], + ) + .unwrap(); + + let batch2 = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(101..=200))], + ) + .unwrap(); + + // Force each batch into its own row group + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&data_file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.close().unwrap(); + + // Verify we created 2 row groups + let verify_file = File::open(&data_file_path).unwrap(); + let verify_reader = SerializedFileReader::new(verify_file).unwrap(); + assert_eq!( + verify_reader.metadata().num_row_groups(), + 2, + "Should have 2 row groups" + ); + + // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1) + let delete_file_path = format!("{}/deletes.parquet", &table_location); + + let delete_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([ + ( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + ), + ])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([ + ( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + ), + ])), + ])); + + // Delete row at position 199 (0-indexed, so it's the last row: id=200) + let delete_batch = RecordBatch::try_new( + delete_schema.clone(), + vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![199i64])), + ], + ) + .unwrap(); + + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + delete_writer.write(&delete_batch).unwrap(); + delete_writer.close().unwrap(); + + // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199) + // This exercises the row group selection code path where row group 0 is skipped + let metadata_file = File::open(&data_file_path).unwrap(); + let metadata_reader = SerializedFileReader::new(metadata_file).unwrap(); + let metadata = metadata_reader.metadata(); + + let row_group_0 = metadata.row_group(0); + let row_group_1 = metadata.row_group(1); + + let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1" + let rg1_start = rg0_start + row_group_0.compressed_size() as u64; + let rg1_length = row_group_1.compressed_size() as u64; + + println!( + "Row group 0: starts at byte {}, {} bytes compressed", + rg0_start, + row_group_0.compressed_size() + ); + println!( + "Row group 1: starts at byte {}, {} bytes compressed", + rg1_start, + row_group_1.compressed_size() + ); + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + // Create FileScanTask that reads ONLY row group 1 via byte range filtering + let task = FileScanTask { + start: rg1_start, + length: rg1_length, + record_count: Some(100), // Row group 1 has 100 rows + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: delete_file_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec_id: None, + partition_spec: None, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Step 4: Verify we got 99 rows (not 100) + // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + println!("Total rows read from row group 1: {}", total_rows); + println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)"); + + // This assertion will FAIL before the fix and PASS after the fix + assert_eq!( + total_rows, 99, + "Expected 99 rows from row group 1 after deleting position 199, but got {} rows. \ + The bug causes position deletes to be lost when advance_to() is followed by next() \ + when skipping unselected row groups.", + total_rows + ); + + // Verify the deleted row (id=200) is not present + let all_ids: Vec = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_primitive::() + .values() + .iter() + .copied() + }) + .collect(); + + assert!( + !all_ids.contains(&200), + "Row with id=200 should be deleted but was found in results" + ); + + // Verify we have ids 101-199 (not 101-200) + let expected_ids: Vec = (101..=199).collect(); + assert_eq!( + all_ids, expected_ids, + "Should have ids 101-199 but got different values" + ); + } + } From 3729c8cadae57408fc656dce57c67b8c2fd8aecc Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 30 Oct 2025 16:55:51 -0400 Subject: [PATCH 3/5] Fix typo. --- crates/iceberg/src/arrow/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 79a8823477..a9ba56d7a2 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -410,7 +410,7 @@ impl ArrowReader { // Note: advance_to() positions the iterator without consuming elements, // so we must NOT call next() here or we'll incorrectly consume a delete // that belongs to a later selected row group. - delete_vector_iter.advance_to(next_row_group_base_idx);\ + delete_vector_iter.advance_to(next_row_group_base_idx); // still increment the current page base index but then skip to the next row group // in the file From 040b5e517d6fabacde71dbfa0a20b1d078344f09 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 30 Oct 2025 16:57:55 -0400 Subject: [PATCH 4/5] Fix errant code from df50 branch. --- crates/iceberg/src/arrow/reader.rs | 66 ++++++++++++------------------ 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index a9ba56d7a2..025d5489fb 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2488,8 +2488,8 @@ message schema { /// /// The bug occurs when processing row group 0 (unselected): /// ```rust - /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100 - /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199! + /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100 + /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199! /// ``` /// /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups, @@ -2518,9 +2518,10 @@ message schema { ); let arrow_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([ - (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()), - ])), + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), ])); // Step 1: Create data file with 200 rows in 2 row groups @@ -2528,17 +2529,15 @@ message schema { // Row group 1: rows 100-199 (ids 101-200) let data_file_path = format!("{}/data.parquet", &table_location); - let batch1 = RecordBatch::try_new( - arrow_schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(1..=100))], - ) - .unwrap(); + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(1..=100), + )]) + .unwrap(); - let batch2 = RecordBatch::try_new( - arrow_schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(101..=200))], - ) - .unwrap(); + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(101..=200), + )]) + .unwrap(); // Force each batch into its own row group let props = WriterProperties::builder() @@ -2565,29 +2564,22 @@ message schema { let delete_file_path = format!("{}/deletes.parquet", &table_location); let delete_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([ - ( - PARQUET_FIELD_ID_META_KEY.to_string(), - FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), - ), - ])), - Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([ - ( - PARQUET_FIELD_ID_META_KEY.to_string(), - FIELD_ID_POSITIONAL_DELETE_POS.to_string(), - ), - ])), + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )])), ])); // Delete row at position 199 (0-indexed, so it's the last row: id=200) - let delete_batch = RecordBatch::try_new( - delete_schema.clone(), - vec![ - Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), - Arc::new(Int64Array::from_iter_values(vec![199i64])), - ], - ) - .unwrap(); + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![199i64])), + ]) + .unwrap(); let delete_props = WriterProperties::builder() .set_compression(Compression::SNAPPY) @@ -2642,9 +2634,6 @@ message schema { partition_spec_id: 0, equality_ids: None, }], - partition: None, - partition_spec_id: None, - partition_spec: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2696,5 +2685,4 @@ message schema { "Should have ids 101-199 but got different values" ); } - } From 4a1e88cc649f53c35f86e73fec618a02721cad3c Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 30 Oct 2025 20:36:43 -0400 Subject: [PATCH 5/5] Fix infinite loop when delete is in skipped row group. --- crates/iceberg/src/arrow/reader.rs | 208 ++++++++++++++++++++++++++++- 1 file changed, 205 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 025d5489fb..e0894ad6b4 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -407,10 +407,16 @@ impl ArrowReader { selected_row_groups_idx += 1; } else { // Advance iterator past all deletes in the skipped row group. - // Note: advance_to() positions the iterator without consuming elements, - // so we must NOT call next() here or we'll incorrectly consume a delete - // that belongs to a later selected row group. + // advance_to() positions the iterator to the first delete >= next_row_group_base_idx. + // However, if our cached next_deleted_row_idx_opt is in the skipped range, + // we need to call next() to update the cache with the newly positioned value. delete_vector_iter.advance_to(next_row_group_base_idx); + // Only update the cache if the cached value is stale (in the skipped range) + if let Some(cached_idx) = next_deleted_row_idx_opt { + if cached_idx < next_row_group_base_idx { + next_deleted_row_idx_opt = delete_vector_iter.next(); + } + } // still increment the current page base index but then skip to the next row group // in the file @@ -2685,4 +2691,200 @@ message schema { "Should have ids 101-199 but got different values" ); } + /// Test for bug where stale cached delete causes infinite loop when skipping row groups. + /// + /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`: + /// - Position delete targets a row in the SKIPPED row group (not the selected one) + /// - After calling advance_to(), the cached delete index is stale + /// - Without updating the cache, the code enters an infinite loop + /// + /// This test creates: + /// - A data file with 200 rows split into 2 row groups (0-99, 100-199) + /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0) + /// - Row group selection that reads ONLY row group 1 (rows 100-199) + /// + /// The bug occurs when skipping row group 0: + /// ```rust + /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0) + /// // ... skip to row group 1 ... + /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0 + /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE! + /// // When processing row group 1: + /// // current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200 + /// // Loop condition: 0 < 200 (true) + /// // But: current_idx (100) > next_deleted_row_idx (0) + /// // And: current_idx (100) != next_deleted_row_idx (0) + /// // Neither branch executes -> INFINITE LOOP! + /// ``` + /// + /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1) + /// Bug behavior: Infinite loop in build_deletes_row_selection + #[tokio::test] + async fn test_position_delete_in_skipped_row_group() { + use arrow_array::{Int32Array, Int64Array}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + // Field IDs for positional delete schema + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + // Create table schema with a single 'id' column + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Step 1: Create data file with 200 rows in 2 row groups + // Row group 0: rows 0-99 (ids 1-100) + // Row group 1: rows 100-199 (ids 101-200) + let data_file_path = format!("{}/data.parquet", &table_location); + + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(1..=100), + )]) + .unwrap(); + + let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from_iter_values(101..=200), + )]) + .unwrap(); + + // Force each batch into its own row group + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(100) + .build(); + + let file = File::create(&data_file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&batch1).expect("Writing batch 1"); + writer.write(&batch2).expect("Writing batch 2"); + writer.close().unwrap(); + + // Verify we created 2 row groups + let verify_file = File::open(&data_file_path).unwrap(); + let verify_reader = SerializedFileReader::new(verify_file).unwrap(); + assert_eq!( + verify_reader.metadata().num_row_groups(), + 2, + "Should have 2 row groups" + ); + + // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0) + let delete_file_path = format!("{}/deletes.parquet", &table_location); + + let delete_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )])), + ])); + + // Delete row at position 0 (0-indexed, so it's the first row: id=1) + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![ + Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])), + Arc::new(Int64Array::from_iter_values(vec![0i64])), + ]) + .unwrap(); + + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + delete_writer.write(&delete_batch).unwrap(); + delete_writer.close().unwrap(); + + // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199) + // This exercises the row group selection code path where row group 0 is skipped + let metadata_file = File::open(&data_file_path).unwrap(); + let metadata_reader = SerializedFileReader::new(metadata_file).unwrap(); + let metadata = metadata_reader.metadata(); + + let row_group_0 = metadata.row_group(0); + let row_group_1 = metadata.row_group(1); + + let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1" + let rg1_start = rg0_start + row_group_0.compressed_size() as u64; + let rg1_length = row_group_1.compressed_size() as u64; + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + // Create FileScanTask that reads ONLY row group 1 via byte range filtering + let task = FileScanTask { + start: rg1_start, + length: rg1_length, + record_count: Some(100), // Row group 1 has 100 rows + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: delete_file_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Step 4: Verify we got 100 rows (all of row group 1) + // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + assert_eq!( + total_rows, 100, + "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \ + If this hangs or fails, it indicates the cached delete index was not updated after advance_to()." + ); + + // Verify we have all ids from row group 1 (101-200) + let all_ids: Vec = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_primitive::() + .values() + .iter() + .copied() + }) + .collect(); + + let expected_ids: Vec = (101..=200).collect(); + assert_eq!( + all_ids, expected_ids, + "Should have ids 101-200 (all of row group 1)" + ); + } }