Skip to content

Commit 6354589

Browse files
authored
fix: allow nullable field of equality delete writer (#834)
According to the doc fixed in apache/iceberg#8981, the equality delete writer can have an optional field id. This PR fixes this. --------- Co-authored-by: ZENOTME <[email protected]>
1 parent 46043a7 commit 6354589

File tree

5 files changed

+117
-11
lines changed

5 files changed

+117
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ apache-avro = "0.17"
4343
array-init = "2"
4444
arrow-arith = { version = "53.3.0" }
4545
arrow-array = { version = "53.4.0" }
46+
arrow-buffer = { version = "53.4.0" }
4647
arrow-cast = { version = "53.4.0" }
4748
arrow-ord = { version = "53.4.0" }
4849
arrow-schema = { version = "53.4.0" }

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ apache-avro = { workspace = true }
4646
array-init = { workspace = true }
4747
arrow-arith = { workspace = true }
4848
arrow-array = { workspace = true }
49+
arrow-buffer = { workspace = true }
4950
arrow-cast = { workspace = true }
5051
arrow-ord = { workspace = true }
5152
arrow-schema = { workspace = true }

crates/iceberg/src/arrow/record_batch_projector.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use std::sync::Arc;
1919

20-
use arrow_array::{ArrayRef, RecordBatch, StructArray};
20+
use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray};
21+
use arrow_buffer::NullBuffer;
2122
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
2223

2324
use crate::error::Result;
@@ -138,6 +139,7 @@ impl RecordBatchProjector {
138139
fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
139140
let mut rev_iterator = field_index.iter().rev();
140141
let mut array = batch[*rev_iterator.next().unwrap()].clone();
142+
let mut null_buffer = array.logical_nulls();
141143
for idx in rev_iterator {
142144
array = array
143145
.as_any()
@@ -148,8 +150,11 @@ impl RecordBatchProjector {
148150
))?
149151
.column(*idx)
150152
.clone();
153+
null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref());
151154
}
152-
Ok(array)
155+
Ok(make_array(
156+
array.to_data().into_builder().nulls(null_buffer).build()?,
157+
))
153158
}
154159
}
155160

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,12 @@ impl EqualityDeleteWriterConfig {
6767
original_arrow_schema,
6868
&equality_ids,
6969
// The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids
70+
// and https://iceberg.apache.org/spec/#equality-delete-files
7071
// - The identifier field ids must be used for primitive types.
7172
// - The identifier field ids must not be used for floating point types or nullable fields.
72-
// - The identifier field ids can be nested field of struct but not nested field of nullable struct.
7373
|field| {
7474
// Only primitive type is allowed to be used for identifier field ids
75-
if field.is_nullable()
76-
|| field.data_type().is_nested()
75+
if field.data_type().is_nested()
7776
|| matches!(
7877
field.data_type(),
7978
DataType::Float16 | DataType::Float32 | DataType::Float64
@@ -92,7 +91,7 @@ impl EqualityDeleteWriterConfig {
9291
.map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
9392
))
9493
},
95-
|field: &Field| !field.is_nullable(),
94+
|_field: &Field| true,
9695
)?;
9796
Ok(Self {
9897
equality_ids,
@@ -168,14 +167,17 @@ impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
168167

169168
#[cfg(test)]
170169
mod test {
170+
use std::collections::HashMap;
171171
use std::sync::Arc;
172172

173173
use arrow_array::types::Int32Type;
174174
use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
175-
use arrow_schema::DataType;
175+
use arrow_buffer::NullBuffer;
176+
use arrow_schema::{DataType, Field, Fields};
176177
use arrow_select::concat::concat_batches;
177178
use itertools::Itertools;
178179
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
180+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
179181
use parquet::file::properties::WriterProperties;
180182
use tempfile::TempDir;
181183
use uuid::Uuid;
@@ -484,14 +486,10 @@ mod test {
484486
// Float and Double are not allowed to be used for equality delete
485487
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err());
486488
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err());
487-
// Int is nullable, not allowed to be used for equality delete
488-
assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err());
489489
// Struct is not allowed to be used for equality delete
490490
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err());
491491
// Nested field of struct is allowed to be used for equality delete
492492
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok());
493-
// Nested field of optional struct is not allowed to be used for equality delete
494-
assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err());
495493
// Nested field of map is not allowed to be used for equality delete
496494
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err());
497495
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err());
@@ -657,4 +655,104 @@ mod test {
657655

658656
Ok(())
659657
}
658+
659+
#[tokio::test]
660+
async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
661+
// prepare data
662+
// Int, Struct(Int), Struct(Struct(Int))
663+
let schema = Schema::builder()
664+
.with_schema_id(1)
665+
.with_fields(vec![
666+
NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
667+
NestedField::optional(
668+
1,
669+
"col1",
670+
Type::Struct(StructType::new(vec![NestedField::optional(
671+
2,
672+
"sub_col",
673+
Type::Primitive(PrimitiveType::Int),
674+
)
675+
.into()])),
676+
)
677+
.into(),
678+
NestedField::optional(
679+
3,
680+
"col2",
681+
Type::Struct(StructType::new(vec![NestedField::optional(
682+
4,
683+
"sub_struct_col",
684+
Type::Struct(StructType::new(vec![NestedField::optional(
685+
5,
686+
"sub_sub_col",
687+
Type::Primitive(PrimitiveType::Int),
688+
)
689+
.into()])),
690+
)
691+
.into()])),
692+
)
693+
.into(),
694+
])
695+
.build()
696+
.unwrap();
697+
let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
698+
// null 1 null(struct)
699+
// 2 null(struct) null(sub_struct_col)
700+
// 3 null(field) null(sub_sub_col)
701+
let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
702+
let col1 = {
703+
let nulls = NullBuffer::from(vec![true, false, true]);
704+
Arc::new(StructArray::new(
705+
if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
706+
fields.clone()
707+
} else {
708+
unreachable!()
709+
},
710+
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
711+
Some(nulls),
712+
))
713+
};
714+
let col2 = {
715+
let inner_col = {
716+
let nulls = NullBuffer::from(vec![true, false, true]);
717+
Arc::new(StructArray::new(
718+
Fields::from(vec![Field::new("sub_sub_col", DataType::Int32, true)
719+
.with_metadata(HashMap::from([(
720+
PARQUET_FIELD_ID_META_KEY.to_string(),
721+
"5".to_string(),
722+
)]))]),
723+
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
724+
Some(nulls),
725+
))
726+
};
727+
let nulls = NullBuffer::from(vec![false, true, true]);
728+
Arc::new(StructArray::new(
729+
if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() {
730+
fields.clone()
731+
} else {
732+
unreachable!()
733+
},
734+
vec![inner_col],
735+
Some(nulls),
736+
))
737+
};
738+
let columns = vec![col0, col1, col2];
739+
740+
let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
741+
let equality_ids = vec![0_i32, 2, 5];
742+
let equality_config =
743+
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
744+
let projector = equality_config.projector.clone();
745+
746+
// check
747+
let to_write_projected = projector.project_batch(to_write)?;
748+
let expect_batch =
749+
RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
750+
Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
751+
Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
752+
Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef,
753+
])
754+
.unwrap();
755+
assert_eq!(to_write_projected, expect_batch);
756+
Ok(())
757+
}
660758
}

0 commit comments

Comments
 (0)