From 41f106a6616b798f9cb8c1fbf7e9d5c87b741a4a Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 24 Dec 2024 00:02:23 +0800 Subject: [PATCH 1/2] fix nullable field of equality delete writer --- Cargo.lock | 1 + Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + .../src/arrow/record_batch_projector.rs | 9 ++- .../base_writer/equality_delete_writer.rs | 69 ++++++++++++++++--- 5 files changed, 71 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8957ae1b81..400175b81e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2931,6 +2931,7 @@ dependencies = [ "array-init", "arrow-arith", "arrow-array", + "arrow-buffer", "arrow-cast", "arrow-ord", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index b3109beaf7..75350030e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ apache-avro = "0.17" array-init = "2" arrow-arith = { version = "53.3.0" } arrow-array = { version = "53.4.0" } +arrow-buffer = { version = "53.4.0" } arrow-cast = { version = "53.4.0" } arrow-ord = { version = "53.4.0" } arrow-schema = { version = "53.4.0" } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 626ca15ef7..ae918fff78 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -46,6 +46,7 @@ apache-avro = { workspace = true } array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index 9cd7456915..878d0fe28e 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray}; +use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use crate::error::Result; @@ -138,6 +139,7 @@ impl RecordBatchProjector { fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { let mut rev_iterator = field_index.iter().rev(); let mut array = batch[*rev_iterator.next().unwrap()].clone(); + let mut null_buffer = array.logical_nulls(); for idx in rev_iterator { array = array .as_any() @@ -148,8 +150,11 @@ impl RecordBatchProjector { ))? .column(*idx) .clone(); + null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref()); } - Ok(array) + Ok(make_array( + array.to_data().into_builder().nulls(null_buffer).build()?, + )) } } diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 069928fa8d..500075c21d 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -67,13 +67,12 @@ impl EqualityDeleteWriterConfig { original_arrow_schema, &equality_ids, // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids + // and https://iceberg.apache.org/spec/#equality-delete-files // - The identifier field ids must be used for primitive types. // - The identifier field ids must not be used for floating point types or nullable fields. - // - The identifier field ids can be nested field of struct but not nested field of nullable struct. |field| { // Only primitive type is allowed to be used for identifier field ids - if field.is_nullable() - || field.data_type().is_nested() + if field.data_type().is_nested() || matches!( field.data_type(), DataType::Float16 | DataType::Float32 | DataType::Float64 @@ -92,7 +91,7 @@ impl EqualityDeleteWriterConfig { .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?, )) }, - |field: &Field| !field.is_nullable(), + |_field: &Field| true, )?; Ok(Self { equality_ids, @@ -172,6 +171,7 @@ mod test { use arrow_array::types::Int32Type; use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray}; + use arrow_buffer::NullBuffer; use arrow_schema::DataType; use arrow_select::concat::concat_batches; use itertools::Itertools; @@ -484,14 +484,10 @@ mod test { // Float and Double are not allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); - // Int is nullable, not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err()); // Struct is not allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); // Nested field of struct is allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); - // Nested field of optional struct is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err()); // Nested field of map is not allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); @@ -657,4 +653,61 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> { + // prepare data + // Int, Struct(Int) + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 1, + "col1", + Type::Struct(StructType::new(vec![NestedField::optional( + 2, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + ]) + .build() + .unwrap(); + let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + // null 1 + // 2 null(struct) + // 3 null(field) + let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef; + let nulls = NullBuffer::from(vec![true, false, true]); + let col1 = Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], + Some(nulls), + )); + let columns = vec![col0, col1]; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); + let equality_ids = vec![0_i32, 2]; + let equality_config = + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); + let projector = equality_config.projector.clone(); + + // check + let to_write_projected = projector.project_batch(to_write)?; + let expect_batch = + RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![ + Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef, + Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef, + ]) + .unwrap(); + assert_eq!(to_write_projected, expect_batch); + Ok(()) + } } From afcecef26433ccee291b3865abcabfa9c917a223 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sat, 25 Jan 2025 12:55:44 +0800 Subject: [PATCH 2/2] add test case for deeply nested struct --- .../base_writer/equality_delete_writer.rs | 79 +++++++++++++++---- 1 file changed, 62 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 500075c21d..fb9682573b 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -167,15 +167,17 @@ impl IcebergWriter for EqualityDeleteFileWriter { #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; use arrow_array::types::Int32Type; use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray}; use arrow_buffer::NullBuffer; - use arrow_schema::DataType; + use arrow_schema::{DataType, Field, Fields}; use arrow_select::concat::concat_batches; use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; use tempfile::TempDir; use uuid::Uuid; @@ -657,7 +659,7 @@ mod test { #[tokio::test] async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> { // prepare data - // Int, Struct(Int) + // Int, Struct(Int), Struct(Struct(Int)) let schema = Schema::builder() .with_schema_id(1) .with_fields(vec![ @@ -673,28 +675,70 @@ mod test { .into()])), ) .into(), + NestedField::optional( + 3, + "col2", + Type::Struct(StructType::new(vec![NestedField::optional( + 4, + "sub_struct_col", + Type::Struct(StructType::new(vec![NestedField::optional( + 5, + "sub_sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into()])), + ) + .into(), ]) .build() .unwrap(); let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - // null 1 - // 2 null(struct) - // 3 null(field) + // null 1 null(struct) + // 2 null(struct) null(sub_struct_col) + // 3 null(field) null(sub_sub_col) let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef; - let nulls = NullBuffer::from(vec![true, false, true]); - let col1 = Arc::new(StructArray::new( - if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { - fields.clone() - } else { - unreachable!() - }, - vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], - Some(nulls), - )); - let columns = vec![col0, col1]; + let col1 = { + let nulls = NullBuffer::from(vec![true, false, true]); + Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], + Some(nulls), + )) + }; + let col2 = { + let inner_col = { + let nulls = NullBuffer::from(vec![true, false, true]); + Arc::new(StructArray::new( + Fields::from(vec![Field::new("sub_sub_col", DataType::Int32, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))]), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], + Some(nulls), + )) + }; + let nulls = NullBuffer::from(vec![false, true, true]); + Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![inner_col], + Some(nulls), + )) + }; + let columns = vec![col0, col1, col2]; let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); - let equality_ids = vec![0_i32, 2]; + let equality_ids = vec![0_i32, 2, 5]; let equality_config = EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); let projector = equality_config.projector.clone(); @@ -705,6 +749,7 @@ mod test { RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![ Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef, Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef, + Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef, ]) .unwrap(); assert_eq!(to_write_projected, expect_batch);