From b2726715e592de62a7ac26f89ff219ee7a286140 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 23 Oct 2025 23:12:51 -0400 Subject: [PATCH 1/3] Removed the `evolve_schema()` call in `load_file_for_task()` for equality delete files. Added new test. --- .../src/arrow/caching_delete_file_loader.rs | 97 +++++++++++++++++-- 1 file changed, 90 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 9cf605680e..24cb1d962d 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -224,14 +224,12 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); + // Equality deletes intentionally have partial schemas. Schema evolution would add + // NULL values for missing REQUIRED columns, causing Arrow validation to fail. Ok(DeleteFileContext::FreshEqDel { - batch_stream: BasicDeleteFileLoader::evolve_schema( - basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) - .await?, - schema, - ) - .await?, + batch_stream: basic_delete_file_loader + .parquet_to_batch_stream(&task.file_path) + .await?, sender, equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()), }) @@ -686,4 +684,89 @@ mod tests { let result = delete_filter.get_delete_vector(&file_scan_tasks[1]); assert!(result.is_none()); // no pos dels for file 3 } + + /// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow + /// validation errors when missing REQUIRED columns are filled with NULLs. + /// + /// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java. + #[tokio::test] + async fn test_partial_schema_equality_deletes_evolve_fails() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + + // Create table schema with REQUIRED fields + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + crate::spec::NestedField::required( + 1, + "id", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + crate::spec::NestedField::required( + 2, + "data", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Write equality delete file with PARTIAL schema (only 'data' column) + let delete_file_path = { + let data_vals = vec!["a", "d", "g"]; + let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef; + + let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field( + "data", + DataType::Utf8, + false, + "2", // field ID + )])); + + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap(); + + let path = format!("{}/partial-eq-deletes.parquet", &table_location); + let file = File::create(&path).unwrap(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let mut writer = + ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap(); + writer.write(&delete_batch).expect("Writing batch"); + writer.close().unwrap(); + path + }; + + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + + let batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&delete_file_path) + .await + .unwrap(); + + let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema) + .await + .unwrap(); + + let result = evolved_stream.next().await.unwrap(); + + assert!( + result.is_err(), + "Expected error from evolve_schema adding NULL to non-nullable column" + ); + + let err = result.unwrap_err(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("non-nullable") || err_msg.contains("null values"), + "Expected null value error, got: {}", + err_msg + ); + } } From 7adc770fe1f681be438815166e2cd6b4566478a2 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 29 Oct 2025 15:49:00 -0400 Subject: [PATCH 2/3] Address PR feedback. --- .../src/arrow/caching_delete_file_loader.rs | 45 ++++++++++++------- .../iceberg/src/arrow/delete_file_loader.rs | 24 +++++----- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 24cb1d962d..e99616bc3b 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -534,6 +534,7 @@ mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Field, Fields}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; @@ -685,12 +686,13 @@ mod tests { assert!(result.is_none()); // no pos dels for file 3 } - /// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow - /// validation errors when missing REQUIRED columns are filled with NULLs. + /// Verifies that evolve_schema on partial-schema equality deletes works correctly + /// when only equality_ids columns are evolved, not all table columns. /// - /// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java. + /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files), + /// equality delete files can contain only a subset of columns. #[tokio::test] - async fn test_partial_schema_equality_deletes_evolve_fails() { + async fn test_partial_schema_equality_deletes_evolve_succeeds() { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); @@ -750,23 +752,32 @@ mod tests { .await .unwrap(); - let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema) - .await - .unwrap(); + // Only evolve the equality_ids columns (field 2), not all table columns + let equality_ids = vec![2]; + let evolved_stream = + BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, &equality_ids) + .await + .unwrap(); - let result = evolved_stream.next().await.unwrap(); + let result = evolved_stream.try_collect::>().await; assert!( - result.is_err(), - "Expected error from evolve_schema adding NULL to non-nullable column" + result.is_ok(), + "Expected success when evolving only equality_ids columns, got error: {:?}", + result.err() ); - let err = result.unwrap_err(); - let err_msg = err.to_string(); - assert!( - err_msg.contains("non-nullable") || err_msg.contains("null values"), - "Expected null value error, got: {}", - err_msg - ); + let batches = result.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); // Only 'data' column + + // Verify the actual values are preserved after schema evolution + let data_col = batch.column(0).as_string::(); + assert_eq!(data_col.value(0), "a"); + assert_eq!(data_col.value(1), "d"); + assert_eq!(data_col.value(2), "g"); } } diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 592ef2eb4a..14551818ff 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -71,20 +71,17 @@ impl BasicDeleteFileLoader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - /// Evolves the schema of the RecordBatches from an equality delete file + /// Evolves the schema of the RecordBatches from an equality delete file. + /// + /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files), + /// only evolves the specified `equality_ids` columns, not all table columns. pub(crate) async fn evolve_schema( record_batch_stream: ArrowRecordBatchStream, target_schema: Arc, + equality_ids: &[i32], ) -> Result { - let eq_ids = target_schema - .as_ref() - .field_id_to_name_map() - .keys() - .cloned() - .collect::>(); - let mut record_batch_transformer = - RecordBatchTransformer::build(target_schema.clone(), &eq_ids); + RecordBatchTransformer::build(target_schema.clone(), equality_ids); let record_batch_stream = record_batch_stream.map(move |record_batch| { record_batch.and_then(|record_batch| { @@ -105,7 +102,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader { ) -> Result { let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?; - Self::evolve_schema(raw_batch_stream, schema).await + // For equality deletes, only evolve the equality_ids columns. + // For positional deletes (equality_ids is None), use all field IDs. + let field_ids = match &task.equality_ids { + Some(ids) => ids.clone(), + None => schema.field_id_to_name_map().keys().cloned().collect(), + }; + + Self::evolve_schema(raw_batch_stream, schema, &field_ids).await } } From 8a32ebb7d2ad230e80b7c7c0a05b06cb2657379d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 29 Oct 2025 15:52:53 -0400 Subject: [PATCH 3/3] Address PR feedback. --- .../src/arrow/caching_delete_file_loader.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index e99616bc3b..3e6b3a22d5 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -224,14 +224,22 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); - // Equality deletes intentionally have partial schemas. Schema evolution would add - // NULL values for missing REQUIRED columns, causing Arrow validation to fail. - Ok(DeleteFileContext::FreshEqDel { - batch_stream: basic_delete_file_loader + // Per the Iceberg spec, evolve schema for equality deletes but only for the + // equality_ids columns, not all table columns. + let equality_ids_vec = task.equality_ids.clone().unwrap(); + let evolved_stream = BasicDeleteFileLoader::evolve_schema( + basic_delete_file_loader .parquet_to_batch_stream(&task.file_path) .await?, + schema, + &equality_ids_vec, + ) + .await?; + + Ok(DeleteFileContext::FreshEqDel { + batch_stream: evolved_stream, sender, - equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()), + equality_ids: HashSet::from_iter(equality_ids_vec), }) }