From 54530bbb6a13659c89cfc4f92ea9f709b0139884 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Oct 2025 15:29:07 -0400 Subject: [PATCH 1/5] Stash. --- .../src/arrow/caching_delete_file_loader.rs | 99 +++++++++++++++++++ crates/iceberg/src/arrow/delete_filter.rs | 5 +- 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 9cf605680e..68dd5c9ba0 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -686,4 +686,103 @@ mod tests { let result = delete_filter.get_delete_vector(&file_scan_tasks[1]); assert!(result.is_none()); // no pos dels for file 3 } + + /// Test loading a FileScanTask with BOTH positional and equality deletes. + /// This reproduces the "Missing predicate for equality delete file" error from TestSparkExecutorCache. + #[tokio::test] + async fn test_load_deletes_with_mixed_types() { + use crate::scan::FileScanTask; + use crate::spec::{DataFileFormat, Schema}; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Create the data file schema + let data_file_schema = Arc::new( + Schema::builder() + .with_fields(vec![ + crate::spec::NestedField::optional(2, "y", crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long)).into(), + crate::spec::NestedField::optional(3, "z", crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long)).into(), + ]) + .build() + .unwrap() + ); + + // Write positional delete file + let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); + let file_path_values = vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; + let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3])); + + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col, + pos_col, + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap()); + let file = File::create(&pos_del_path).unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + writer.write(&positional_deletes_to_write).unwrap(); + writer.close().unwrap(); + + // Write equality delete file + let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap()); + + // Create FileScanTask with BOTH positional and equality deletes + let pos_del = FileScanTaskDeleteFile { + file_path: pos_del_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }; + + let eq_del = FileScanTaskDeleteFile { + file_path: eq_delete_path.clone(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![2, 3, 4, 6]), // field IDs from the equality delete schema + }; + + let file_scan_task = FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()), + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![2, 3], + predicate: None, + deletes: vec![pos_del, eq_del], // BOTH types of deletes! + }; + + // Load the deletes - this should handle both types + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_filter = delete_file_loader + .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) + .await + .unwrap() + .unwrap(); + + // Try to build the equality delete predicate - this should fail with + // "Missing predicate for equality delete file" if the bug exists + let result = delete_filter.build_equality_delete_predicate(&file_scan_task).await; + + // For now, this should fail, but once we fix the bug, this assertion should pass + assert!(result.is_ok(), "Expected to successfully build equality delete predicate, but got error: {:?}", result.err()); + } } diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index b853baa993..7cc15e3a40 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -68,10 +68,13 @@ impl DeleteFilter { pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option> { let mut state = self.state.write().unwrap(); - if !state.equality_deletes.contains_key(file_path) { + // If this equality delete file is already being loaded or has been loaded, return None + // to indicate that another task is handling it or it's already available + if state.equality_deletes.contains_key(file_path) { return None; } + // First time seeing this equality delete file - mark it as Loading let notifier = Arc::new(Notify::new()); state .equality_deletes From 32dddbe8b772e807dbcb05e59ca79f752de3f387 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Oct 2025 16:37:59 -0400 Subject: [PATCH 2/5] Clean up comments. --- .../src/arrow/caching_delete_file_loader.rs | 40 +++++++++++++------ crates/iceberg/src/arrow/delete_filter.rs | 5 +-- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 68dd5c9ba0..c8a3f0a8ac 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -688,7 +688,7 @@ mod tests { } /// Test loading a FileScanTask with BOTH positional and equality deletes. - /// This reproduces the "Missing predicate for equality delete file" error from TestSparkExecutorCache. + /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. #[tokio::test] async fn test_load_deletes_with_mixed_types() { use crate::scan::FileScanTask; @@ -705,16 +705,27 @@ mod tests { let data_file_schema = Arc::new( Schema::builder() .with_fields(vec![ - crate::spec::NestedField::optional(2, "y", crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long)).into(), - crate::spec::NestedField::optional(3, "z", crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long)).into(), + crate::spec::NestedField::optional( + 2, + "y", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + crate::spec::NestedField::optional( + 3, + "z", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), ]) .build() - .unwrap() + .unwrap(), ); // Write positional delete file let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); - let file_path_values = vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; + let file_path_values = + vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values)); let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3])); @@ -767,10 +778,10 @@ mod tests { schema: data_file_schema.clone(), project_field_ids: vec![2, 3], predicate: None, - deletes: vec![pos_del, eq_del], // BOTH types of deletes! + deletes: vec![pos_del, eq_del], }; - // Load the deletes - this should handle both types + // Load the deletes - should handle both types without error let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); let delete_filter = delete_file_loader .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) @@ -778,11 +789,14 @@ mod tests { .unwrap() .unwrap(); - // Try to build the equality delete predicate - this should fail with - // "Missing predicate for equality delete file" if the bug exists - let result = delete_filter.build_equality_delete_predicate(&file_scan_task).await; - - // For now, this should fail, but once we fix the bug, this assertion should pass - assert!(result.is_ok(), "Expected to successfully build equality delete predicate, but got error: {:?}", result.err()); + // Verify both delete types can be processed together + let result = delete_filter + .build_equality_delete_predicate(&file_scan_task) + .await; + assert!( + result.is_ok(), + "Failed to build equality delete predicate: {:?}", + result.err() + ); } } diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 7cc15e3a40..4250974bcd 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -68,13 +68,12 @@ impl DeleteFilter { pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option> { let mut state = self.state.write().unwrap(); - // If this equality delete file is already being loaded or has been loaded, return None - // to indicate that another task is handling it or it's already available + // Skip if already loaded/loading - another task owns it if state.equality_deletes.contains_key(file_path) { return None; } - // First time seeing this equality delete file - mark it as Loading + // Mark as loading to prevent duplicate work let notifier = Arc::new(Notify::new()); state .equality_deletes From d1ec371ed7fdc6ca5d642fe94294a17a7ebba023 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Oct 2025 17:46:57 -0400 Subject: [PATCH 3/5] Clean up comments and paths. --- .../src/arrow/caching_delete_file_loader.rs | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index c8a3f0a8ac..afae84c27b 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -687,8 +687,9 @@ mod tests { assert!(result.is_none()); // no pos dels for file 3 } - /// Test loading a FileScanTask with BOTH positional and equality deletes. - /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. + /// Test loading a FileScanTask with both positional and equality deletes. + /// Verifies the fix for the inverted condition that caused "Missing predicate for equality + /// delete file" errors first encountered when running Iceberg Java's TestSparkExecutorCache #[tokio::test] async fn test_load_deletes_with_mixed_types() { use crate::scan::FileScanTask; @@ -701,28 +702,16 @@ mod tests { .build() .unwrap(); - // Create the data file schema let data_file_schema = Arc::new( Schema::builder() .with_fields(vec![ - crate::spec::NestedField::optional( - 2, - "y", - crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), - ) - .into(), - crate::spec::NestedField::optional( - 3, - "z", - crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), - ) - .into(), + NestedField::optional(2, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(3, "z", Type::Primitive(PrimitiveType::Long)).into(), ]) .build() .unwrap(), ); - // Write positional delete file let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); let file_path_values = vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; @@ -751,7 +740,6 @@ mod tests { writer.write(&positional_deletes_to_write).unwrap(); writer.close().unwrap(); - // Write equality delete file let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap()); // Create FileScanTask with BOTH positional and equality deletes From b95b7234d3bd215b13404520aeedadeb224205c9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 27 Oct 2025 10:21:41 -0400 Subject: [PATCH 4/5] Fix test that will conflict with changes in #1782. --- .../src/arrow/caching_delete_file_loader.rs | 121 ++++++++++++++++-- 1 file changed, 108 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index afae84c27b..fdb063c630 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -224,14 +224,12 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); + // Equality deletes intentionally have partial schemas. Schema evolution would add + // NULL values for missing REQUIRED columns, causing Arrow validation to fail. Ok(DeleteFileContext::FreshEqDel { - batch_stream: BasicDeleteFileLoader::evolve_schema( - basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) - .await?, - schema, - ) - .await?, + batch_stream: basic_delete_file_loader + .parquet_to_batch_stream(&task.file_path) + .await?, sender, equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()), }) @@ -687,9 +685,93 @@ mod tests { assert!(result.is_none()); // no pos dels for file 3 } - /// Test loading a FileScanTask with both positional and equality deletes. - /// Verifies the fix for the inverted condition that caused "Missing predicate for equality - /// delete file" errors first encountered when running Iceberg Java's TestSparkExecutorCache + /// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow + /// validation errors when missing REQUIRED columns are filled with NULLs. + /// + /// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java. + #[tokio::test] + async fn test_partial_schema_equality_deletes_evolve_fails() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + + // Create table schema with REQUIRED fields + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + crate::spec::NestedField::required( + 1, + "id", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + crate::spec::NestedField::required( + 2, + "data", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Write equality delete file with PARTIAL schema (only 'data' column) + let delete_file_path = { + let data_vals = vec!["a", "d", "g"]; + let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef; + + let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field( + "data", + DataType::Utf8, + false, + "2", // field ID + )])); + + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap(); + + let path = format!("{}/partial-eq-deletes.parquet", &table_location); + let file = File::create(&path).unwrap(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let mut writer = + ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap(); + writer.write(&delete_batch).expect("Writing batch"); + writer.close().unwrap(); + path + }; + + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + + let batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&delete_file_path) + .await + .unwrap(); + + let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema) + .await + .unwrap(); + + let result = evolved_stream.next().await.unwrap(); + + assert!( + result.is_err(), + "Expected error from evolve_schema adding NULL to non-nullable column" + ); + + let err = result.unwrap_err(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("non-nullable") || err_msg.contains("null values"), + "Expected null value error, got: {}", + err_msg + ); + } + + /// Test loading a FileScanTask with BOTH positional and equality deletes. + /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. #[tokio::test] async fn test_load_deletes_with_mixed_types() { use crate::scan::FileScanTask; @@ -702,16 +784,28 @@ mod tests { .build() .unwrap(); + // Create the data file schema let data_file_schema = Arc::new( Schema::builder() .with_fields(vec![ - NestedField::optional(2, "y", Type::Primitive(PrimitiveType::Long)).into(), - NestedField::optional(3, "z", Type::Primitive(PrimitiveType::Long)).into(), + crate::spec::NestedField::optional( + 2, + "y", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + crate::spec::NestedField::optional( + 3, + "z", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), ]) .build() .unwrap(), ); + // Write positional delete file let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); let file_path_values = vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; @@ -740,6 +834,7 @@ mod tests { writer.write(&positional_deletes_to_write).unwrap(); writer.close().unwrap(); + // Write equality delete file let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap()); // Create FileScanTask with BOTH positional and equality deletes @@ -754,7 +849,7 @@ mod tests { file_path: eq_delete_path.clone(), file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, - equality_ids: Some(vec![2, 3, 4, 6]), // field IDs from the equality delete schema + equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas }; let file_scan_task = FileScanTask { From 27f175589c49ac9cebe0d557640bd21de8cbc217 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 29 Oct 2025 15:38:50 -0400 Subject: [PATCH 5/5] Remove unintended changes from #1782. --- .../src/arrow/caching_delete_file_loader.rs | 97 ++----------------- 1 file changed, 7 insertions(+), 90 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index fdb063c630..006f96e8a3 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -224,12 +224,14 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); - // Equality deletes intentionally have partial schemas. Schema evolution would add - // NULL values for missing REQUIRED columns, causing Arrow validation to fail. Ok(DeleteFileContext::FreshEqDel { - batch_stream: basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) - .await?, + batch_stream: BasicDeleteFileLoader::evolve_schema( + basic_delete_file_loader + .parquet_to_batch_stream(&task.file_path) + .await?, + schema, + ) + .await?, sender, equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()), }) @@ -685,91 +687,6 @@ mod tests { assert!(result.is_none()); // no pos dels for file 3 } - /// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow - /// validation errors when missing REQUIRED columns are filled with NULLs. - /// - /// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java. - #[tokio::test] - async fn test_partial_schema_equality_deletes_evolve_fails() { - let tmp_dir = TempDir::new().unwrap(); - let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); - - // Create table schema with REQUIRED fields - let table_schema = Arc::new( - Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - crate::spec::NestedField::required( - 1, - "id", - crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int), - ) - .into(), - crate::spec::NestedField::required( - 2, - "data", - crate::spec::Type::Primitive(crate::spec::PrimitiveType::String), - ) - .into(), - ]) - .build() - .unwrap(), - ); - - // Write equality delete file with PARTIAL schema (only 'data' column) - let delete_file_path = { - let data_vals = vec!["a", "d", "g"]; - let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef; - - let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field( - "data", - DataType::Utf8, - false, - "2", // field ID - )])); - - let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap(); - - let path = format!("{}/partial-eq-deletes.parquet", &table_location); - let file = File::create(&path).unwrap(); - let props = WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build(); - let mut writer = - ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap(); - writer.write(&delete_batch).expect("Writing batch"); - writer.close().unwrap(); - path - }; - - let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); - let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); - - let batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&delete_file_path) - .await - .unwrap(); - - let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema) - .await - .unwrap(); - - let result = evolved_stream.next().await.unwrap(); - - assert!( - result.is_err(), - "Expected error from evolve_schema adding NULL to non-nullable column" - ); - - let err = result.unwrap_err(); - let err_msg = err.to_string(); - assert!( - err_msg.contains("non-nullable") || err_msg.contains("null values"), - "Expected null value error, got: {}", - err_msg - ); - } - /// Test loading a FileScanTask with BOTH positional and equality deletes. /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. #[tokio::test]