From 623e1d1a03ecc7a4e2f9d2340635a9548d210af2 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 19 Nov 2024 17:42:44 +0800 Subject: [PATCH 1/7] copy from #372 --- .../base_writer/equality_delete_writer.rs | 589 ++++++++++++++++++ crates/iceberg/src/writer/base_writer/mod.rs | 1 + 2 files changed, 590 insertions(+) create mode 100644 crates/iceberg/src/writer/base_writer/equality_delete_writer.rs diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs new file mode 100644 index 0000000000..3fb98aaea3 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -0,0 +1,589 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provide `EqualityDeleteWriter`. + +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; +use itertools::Itertools; + +use crate::spec::{DataFile, Struct}; +use crate::writer::file_writer::FileWriter; +use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for `EqualityDeleteWriter`. +#[derive(Clone)] +pub struct EqualityDeleteFileWriterBuilder { + inner: B, +} + +impl EqualityDeleteFileWriterBuilder { + /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B) -> Self { + Self { inner } + } +} + +/// Config for `EqualityDeleteWriter`. +pub struct EqualityDeleteWriterConfig { + equality_ids: Vec, + projector: FieldProjector, + schema: SchemaRef, + partition_value: Struct, +} + +impl EqualityDeleteWriterConfig { + /// Create a new `DataFileWriterConfig` with equality ids. + pub fn new( + equality_ids: Vec, + projector: FieldProjector, + schema: Schema, + partition_value: Option, + ) -> Self { + Self { + equality_ids, + projector, + schema: schema.into(), + partition_value: partition_value.unwrap_or(Struct::empty()), + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { + type R = EqualityDeleteFileWriter; + type C = EqualityDeleteWriterConfig; + + async fn build(self, config: Self::C) -> Result { + Ok(EqualityDeleteFileWriter { + inner_writer: Some(self.inner.clone().build().await?), + projector: config.projector, + delete_schema_ref: config.schema, + equality_ids: config.equality_ids, + partition_value: config.partition_value, + }) + } +} + +/// A writer write data +pub struct EqualityDeleteFileWriter { + inner_writer: Option, + projector: FieldProjector, + delete_schema_ref: SchemaRef, + equality_ids: Vec, + partition_value: Struct, +} + +impl EqualityDeleteFileWriter { + fn project_record_batch_columns(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.delete_schema_ref.clone(), + self.projector.project(batch.columns())?, + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeleteFileWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let batch = self.project_record_batch_columns(batch)?; + if let Some(writer) = self.inner_writer.as_mut() { + writer.write(&batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer does not exist", + )) + } + } + + async fn close(&mut self) -> Result> { + if let Some(writer) = self.inner_writer.take() { + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::EqualityDeletes); + res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); + res.partition(self.partition_value.clone()); + res.build().expect("msg") + }) + .collect_vec()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer does not exist", + )) + } + } +} + +/// Help to project specific field from `RecordBatch`` according to the column id. +pub struct FieldProjector { + index_vec_vec: Vec>, +} + +impl FieldProjector { + /// Init FieldProjector + pub fn new( + batch_fields: &Fields, + column_ids: &[usize], + column_id_meta_key: &str, + ) -> Result<(Self, Fields)> { + let mut index_vec_vec = Vec::with_capacity(column_ids.len()); + let mut fields = Vec::with_capacity(column_ids.len()); + for &id in column_ids { + let mut index_vec = vec![]; + if let Ok(field) = Self::fetch_column_index( + batch_fields, + &mut index_vec, + id as i64, + column_id_meta_key, + ) { + fields.push(field.clone()); + index_vec_vec.push(index_vec); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Can't find source column id or column data type invalid: {}", + id + ), + )); + } + } + Ok((Self { index_vec_vec }, Fields::from_iter(fields))) + } + + fn fetch_column_index( + fields: &Fields, + index_vec: &mut Vec, + col_id: i64, + column_id_meta_key: &str, + ) -> Result { + for (pos, field) in fields.iter().enumerate() { + match field.data_type() { + DataType::Float16 | DataType::Float32 | DataType::Float64 => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete column data type cannot be float or double", + )); + } + _ => { + let id: i64 = field + .metadata() + .get(column_id_meta_key) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "column_id must be set"))? + .parse::() + .map_err(|_| { + Error::new(ErrorKind::DataInvalid, "column_id must be parsable as i64") + })?; + if col_id == id { + index_vec.push(pos); + return Ok(field.clone()); + } + if let DataType::Struct(inner) = field.data_type() { + let res = + Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key); + if !index_vec.is_empty() { + index_vec.push(pos); + return res; + } + } + } + } + } + Err(Error::new( + ErrorKind::DataInvalid, + "Column id not found in fields", + )) + } + /// Do projection with batch + pub fn project(&self, batch: &[ArrayRef]) -> Result> { + self.index_vec_vec + .iter() + .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) + .collect::>>() + } + + fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> Result { + let mut rev_iterator = index_vec.iter().rev(); + let mut array = batch[*rev_iterator.next().unwrap()].clone(); + for idx in rev_iterator { + array = array + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "Cannot convert Array to StructArray", + ))? + .column(*idx) + .clone(); + } + Ok(array) + } +} + +#[cfg(test)] +mod test { + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use std::{collections::HashMap, sync::Arc}; + + use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray}; + use parquet::{ + arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, PARQUET_FIELD_ID_META_KEY}, + file::properties::WriterProperties, + }; + use tempfile::TempDir; + + use crate::{ + io::{FileIO, FileIOBuilder}, + spec::{DataFile, DataFileFormat}, + writer::{ + base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, FieldProjector, + }, + file_writer::{ + location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, + ParquetWriterBuilder, + }, + IcebergWriter, IcebergWriterBuilder, + }, + }; + + async fn check_parquet_data_file_with_equality_delete_write( + file_io: &FileIO, + data_file: &DataFile, + batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + + // read the written file + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + // read the written file + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(*batch, res); + + // check metadata + let expect_column_num = batch.num_columns(); + + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + + assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64); + + assert_eq!(data_file.column_sizes.len(), expect_column_num); + + for (index, id) in data_file.column_sizes().keys().sorted().enumerate() { + metadata + .row_groups() + .iter() + .map(|group| group.columns()) + .for_each(|column| { + assert_eq!( + *data_file.column_sizes.get(id).unwrap() as i64, + column.get(index).unwrap().compressed_size() + ); + }); + } + + assert_eq!(data_file.value_counts.len(), expect_column_num); + data_file.value_counts.iter().for_each(|(_, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + for (index, id) in data_file.null_value_counts().keys().enumerate() { + let expect = batch.column(index).null_count() as u64; + assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect); + } + + assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); + data_file + .split_offsets + .iter() + .enumerate() + .for_each(|(i, &v)| { + let expect = metadata.row_groups()[i].file_offset().unwrap(); + assert_eq!(v, expect); + }); + } + + #[tokio::test] + async fn test_equality_delete_writer() -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // prepare data + // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new( + "col1", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new( + "col3", + arrow_schema::DataType::List(Arc::new( + arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new( + "col4", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]; + arrow_schema::Schema::new(fields) + }; + let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let col1 = Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + )); + let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ + "test"; + 1024 + ])) as ArrayRef; + let col3 = Arc::new({ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ + Some( + vec![Some(1),] + ); + 1024 + ]) + .into_parts(); + arrow_array::ListArray::new( + Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )]))), + list_parts.1, + list_parts.2, + list_parts.3, + ) + }) as ArrayRef; + let col4 = Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )]))] + .into(), + vec![Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + ))], + None, + )); + let columns = vec![col0, col1, col2, col3, col4]; + + let equality_ids = vec![1, 3]; + let (projector, fields) = + FieldProjector::new(schema.fields(), &equality_ids, PARQUET_FIELD_ID_META_KEY)?; + let delete_schema = arrow_schema::Schema::new(fields); + let delete_schema_ref = Arc::new(delete_schema.clone()); + + // prepare writer + let to_write = RecordBatch::try_new(Arc::new(schema.clone()), columns).unwrap(); + let pb = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + delete_schema_ref.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) + .build(EqualityDeleteWriterConfig::new( + equality_ids, + projector, + delete_schema.clone(), + None, + )) + .await?; + // write + equality_delete_writer.write(to_write.clone()).await?; + let res = equality_delete_writer.close().await?; + assert_eq!(res.len(), 1); + let data_file = res.into_iter().next().unwrap(); + + // check + let to_write_projected = equality_delete_writer.project_record_batch_columns(to_write)?; + check_parquet_data_file_with_equality_delete_write( + &file_io, + &data_file, + &to_write_projected, + ) + .await; + Ok(()) + } + + #[tokio::test] + async fn test_equality_delete_float_or_double_column() -> Result<(), anyhow::Error> { + // Float32, Float64 + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Float32, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new("col1", arrow_schema::DataType::Float64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ]; + arrow_schema::Schema::new(fields) + }; + + let equality_id_float = vec![0]; + let result_float = FieldProjector::new( + schema.fields(), + &equality_id_float, + PARQUET_FIELD_ID_META_KEY, + ); + assert!(result_float.is_err()); + + let equality_ids_double = vec![1]; + let result_double = FieldProjector::new( + schema.fields(), + &equality_ids_double, + PARQUET_FIELD_ID_META_KEY, + ); + assert!(result_double.is_err()); + + Ok(()) + } +} \ No newline at end of file diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37da2ab818..37ab97eb6d 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -18,3 +18,4 @@ //! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`. pub mod data_file_writer; +pub mod equality_delete_writer; From f3bdcac28bae080bd0ec046e238b52e18dfc4c45 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 19 Nov 2024 21:13:59 +0800 Subject: [PATCH 2/7] fix test and refine ArrowFieldProjector --- .../base_writer/equality_delete_writer.rs | 325 ++++++++---------- 1 file changed, 140 insertions(+), 185 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 3fb98aaea3..6448db9823 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -17,13 +17,15 @@ //! This module provide `EqualityDeleteWriter`. +use std::sync::Arc; + use arrow_array::{ArrayRef, RecordBatch, StructArray}; use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; use itertools::Itertools; use crate::spec::{DataFile, Struct}; -use crate::writer::file_writer::FileWriter; -use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; /// Builder for `EqualityDeleteWriter`. @@ -42,8 +44,7 @@ impl EqualityDeleteFileWriterBuilder { /// Config for `EqualityDeleteWriter`. pub struct EqualityDeleteWriterConfig { equality_ids: Vec, - projector: FieldProjector, - schema: SchemaRef, + projector: ArrowFieldProjector, partition_value: Struct, } @@ -51,14 +52,12 @@ impl EqualityDeleteWriterConfig { /// Create a new `DataFileWriterConfig` with equality ids. pub fn new( equality_ids: Vec, - projector: FieldProjector, - schema: Schema, + projector: ArrowFieldProjector, partition_value: Option, ) -> Self { Self { equality_ids, projector, - schema: schema.into(), partition_value: partition_value.unwrap_or(Struct::empty()), } } @@ -73,7 +72,6 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil Ok(EqualityDeleteFileWriter { inner_writer: Some(self.inner.clone().build().await?), projector: config.projector, - delete_schema_ref: config.schema, equality_ids: config.equality_ids, partition_value: config.partition_value, }) @@ -83,26 +81,15 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil /// A writer write data pub struct EqualityDeleteFileWriter { inner_writer: Option, - projector: FieldProjector, - delete_schema_ref: SchemaRef, + projector: ArrowFieldProjector, equality_ids: Vec, partition_value: Struct, } -impl EqualityDeleteFileWriter { - fn project_record_batch_columns(&self, batch: RecordBatch) -> Result { - RecordBatch::try_new( - self.delete_schema_ref.clone(), - self.projector.project(batch.columns())?, - ) - .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) - } -} - #[async_trait::async_trait] impl IcebergWriter for EqualityDeleteFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { - let batch = self.project_record_batch_columns(batch)?; + let batch = self.projector.project_bacth(batch)?; if let Some(writer) = self.inner_writer.as_mut() { writer.write(&batch).await } else { @@ -136,23 +123,25 @@ impl IcebergWriter for EqualityDeleteFileWriter { } /// Help to project specific field from `RecordBatch`` according to the column id. -pub struct FieldProjector { +#[derive(Clone)] +pub struct ArrowFieldProjector { index_vec_vec: Vec>, + projected_schema: SchemaRef, } -impl FieldProjector { +impl ArrowFieldProjector { /// Init FieldProjector pub fn new( - batch_fields: &Fields, + original_schema: SchemaRef, column_ids: &[usize], column_id_meta_key: &str, - ) -> Result<(Self, Fields)> { + ) -> Result { let mut index_vec_vec = Vec::with_capacity(column_ids.len()); let mut fields = Vec::with_capacity(column_ids.len()); for &id in column_ids { let mut index_vec = vec![]; if let Ok(field) = Self::fetch_column_index( - batch_fields, + original_schema.fields(), &mut index_vec, id as i64, column_id_meta_key, @@ -169,7 +158,33 @@ impl FieldProjector { )); } } - Ok((Self { index_vec_vec }, Fields::from_iter(fields))) + let delete_arrow_schema = Arc::new(Schema::new(fields)); + Ok(Self { + index_vec_vec, + projected_schema: delete_arrow_schema, + }) + } + + /// Return the reference of projected schema + pub fn projected_schema_ref(&self) -> &SchemaRef { + &self.projected_schema + } + + /// Do projection with record batch + pub fn project_bacth(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.projected_schema.clone(), + self.project_column(batch.columns())?, + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } + + /// Do projection with columns + pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { + self.index_vec_vec + .iter() + .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) + .collect::>>() } fn fetch_column_index( @@ -215,13 +230,6 @@ impl FieldProjector { "Column id not found in fields", )) } - /// Do projection with batch - pub fn project(&self, batch: &[ArrayRef]) -> Result> { - self.index_vec_vec - .iter() - .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) - .collect::>>() - } fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> Result { let mut rev_iterator = index_vec.iter().rev(); @@ -243,31 +251,31 @@ impl FieldProjector { #[cfg(test)] mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::types::Int32Type; + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StructArray}; + use arrow_schema::DataType; use arrow_select::concat::concat_batches; use itertools::Itertools; - use std::{collections::HashMap, sync::Arc}; - - use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray}; - use parquet::{ - arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, PARQUET_FIELD_ID_META_KEY}, - file::properties::WriterProperties, - }; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; use tempfile::TempDir; - use crate::{ - io::{FileIO, FileIOBuilder}, - spec::{DataFile, DataFileFormat}, - writer::{ - base_writer::equality_delete_writer::{ - EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, FieldProjector, - }, - file_writer::{ - location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, - ParquetWriterBuilder, - }, - IcebergWriter, IcebergWriterBuilder, - }, + use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; + use crate::io::{FileIO, FileIOBuilder}; + use crate::spec::{ + DataFile, DataFileFormat, ListType, NestedField, PrimitiveType, Schema, StructType, Type, }; + use crate::writer::base_writer::equality_delete_writer::{ + ArrowFieldProjector, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; async fn check_parquet_data_file_with_equality_delete_write( file_io: &FileIO, @@ -356,96 +364,59 @@ mod test { // prepare data // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) - let schema = { - let fields = vec![ - arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "0".to_string(), - )])), - arrow_schema::Field::new( + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 1, "col1", - arrow_schema::DataType::Struct( - vec![arrow_schema::Field::new( - "sub_col", - arrow_schema::DataType::Int64, - true, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )]))] - .into(), - ), - true, + Type::Struct(StructType::new(vec![NestedField::required( + 5, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), - ), - arrow_schema::Field::new( + .into(), + NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required( + 3, "col3", - arrow_schema::DataType::List(Arc::new( - arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )])), + Type::List(ListType::new( + NestedField::required(6, "element", Type::Primitive(PrimitiveType::Int)) + .into(), )), - true, ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )])), - arrow_schema::Field::new( + .into(), + NestedField::required( + 4, "col4", - arrow_schema::DataType::Struct( - vec![arrow_schema::Field::new( - "sub_col", - arrow_schema::DataType::Struct( - vec![arrow_schema::Field::new( - "sub_sub_col", - arrow_schema::DataType::Int64, - true, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )]))] - .into(), - ), - true, + Type::Struct(StructType::new(vec![NestedField::required( + 7, + "sub_col", + Type::Struct(StructType::new(vec![NestedField::required( + 8, + "sub_sub_col", + Type::Primitive(PrimitiveType::Int), ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "8".to_string(), - )]))] - .into(), - ), - true, + .into()])), + ) + .into()])), ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )])), - ]; - arrow_schema::Schema::new(fields) - }; - let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + .into(), + ]) + .build() + .unwrap(); + let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let col0 = Arc::new(Int32Array::from_iter_values(vec![1; 1024])) as ArrayRef; let col1 = Arc::new(StructArray::new( - vec![ - arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )])), - ] - .into(), - vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))], None, )); let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ @@ -453,7 +424,7 @@ mod test { 1024 ])) as ArrayRef; let col3 = Arc::new({ - let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ Some( vec![Some(1),] ); @@ -461,64 +432,49 @@ mod test { ]) .into_parts(); arrow_array::ListArray::new( - Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )]))), + if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() { + field.clone() + } else { + unreachable!() + }, list_parts.1, list_parts.2, list_parts.3, ) }) as ArrayRef; let col4 = Arc::new(StructArray::new( - vec![arrow_schema::Field::new( - "sub_col", - arrow_schema::DataType::Struct( - vec![arrow_schema::Field::new( - "sub_sub_col", - arrow_schema::DataType::Int64, - true, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )]))] - .into(), - ), - true, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "8".to_string(), - )]))] - .into(), + if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, vec![Arc::new(StructArray::new( - vec![ - arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )])), - ] - .into(), - vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() { + if let DataType::Struct(fields) = fields.first().unwrap().data_type() { + fields.clone() + } else { + unreachable!() + } + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))], None, ))], None, )); let columns = vec![col0, col1, col2, col3, col4]; + let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); let equality_ids = vec![1, 3]; - let (projector, fields) = - FieldProjector::new(schema.fields(), &equality_ids, PARQUET_FIELD_ID_META_KEY)?; - let delete_schema = arrow_schema::Schema::new(fields); - let delete_schema_ref = Arc::new(delete_schema.clone()); + let projector = + ArrowFieldProjector::new(arrow_schema, &equality_ids, PARQUET_FIELD_ID_META_KEY)?; + let delete_schema = arrow_schema_to_schema(projector.projected_schema_ref()).unwrap(); // prepare writer - let to_write = RecordBatch::try_new(Arc::new(schema.clone()), columns).unwrap(); let pb = ParquetWriterBuilder::new( WriterProperties::builder().build(), - delete_schema_ref.clone(), + Arc::new(delete_schema), file_io.clone(), location_gen, file_name_gen, @@ -527,8 +483,7 @@ mod test { let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) .build(EqualityDeleteWriterConfig::new( equality_ids, - projector, - delete_schema.clone(), + projector.clone(), None, )) .await?; @@ -539,7 +494,7 @@ mod test { let data_file = res.into_iter().next().unwrap(); // check - let to_write_projected = equality_delete_writer.project_record_batch_columns(to_write)?; + let to_write_projected = projector.project_bacth(to_write)?; check_parquet_data_file_with_equality_delete_write( &file_io, &data_file, @@ -565,20 +520,20 @@ mod test { "1".to_string(), )])), ]; - arrow_schema::Schema::new(fields) + Arc::new(arrow_schema::Schema::new(fields)) }; let equality_id_float = vec![0]; - let result_float = FieldProjector::new( - schema.fields(), + let result_float = ArrowFieldProjector::new( + schema.clone(), &equality_id_float, PARQUET_FIELD_ID_META_KEY, ); assert!(result_float.is_err()); let equality_ids_double = vec![1]; - let result_double = FieldProjector::new( - schema.fields(), + let result_double = ArrowFieldProjector::new( + schema.clone(), &equality_ids_double, PARQUET_FIELD_ID_META_KEY, ); @@ -586,4 +541,4 @@ mod test { Ok(()) } -} \ No newline at end of file +} From 5ebe261dae5f7bc44d5b0c90a573f4e283d2b9eb Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 20 Nov 2024 22:59:13 +0800 Subject: [PATCH 3/7] add comment and rename to make code more clear --- .../base_writer/equality_delete_writer.rs | 106 +++++++++--------- 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 6448db9823..e03ac8b2a7 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch, StructArray}; use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::spec::{DataFile, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; @@ -43,7 +44,9 @@ impl EqualityDeleteFileWriterBuilder { /// Config for `EqualityDeleteWriter`. pub struct EqualityDeleteWriterConfig { + // Field ids used to determine row equality in equality delete files. equality_ids: Vec, + // Projector used to project the data chunk into specific fields. projector: ArrowFieldProjector, partition_value: Struct, } @@ -78,7 +81,7 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil } } -/// A writer write data +/// Writer used to write equality delete files. pub struct EqualityDeleteFileWriter { inner_writer: Option, projector: ArrowFieldProjector, @@ -122,32 +125,32 @@ impl IcebergWriter for EqualityDeleteFileWriter { } } -/// Help to project specific field from `RecordBatch`` according to the column id. +/// Help to project specific field from `RecordBatch`` according to the fields id. #[derive(Clone)] pub struct ArrowFieldProjector { - index_vec_vec: Vec>, + // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. + // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, + // while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column). + field_indices: Vec>, + // The schema reference after projection. This schema is derived from the original schema based on the given field IDs. projected_schema: SchemaRef, } impl ArrowFieldProjector { - /// Init FieldProjector - pub fn new( - original_schema: SchemaRef, - column_ids: &[usize], - column_id_meta_key: &str, - ) -> Result { - let mut index_vec_vec = Vec::with_capacity(column_ids.len()); - let mut fields = Vec::with_capacity(column_ids.len()); - for &id in column_ids { - let mut index_vec = vec![]; - if let Ok(field) = Self::fetch_column_index( + /// Init ArrowFieldProjector + pub fn new(original_schema: SchemaRef, field_ids: &[usize]) -> Result { + let mut field_indexs = Vec::with_capacity(field_ids.len()); + let mut fields = Vec::with_capacity(field_ids.len()); + for &id in field_ids { + let mut field_index = vec![]; + if let Ok(field) = Self::fetch_field_index( original_schema.fields(), - &mut index_vec, + &mut field_index, id as i64, - column_id_meta_key, + PARQUET_FIELD_ID_META_KEY, ) { fields.push(field.clone()); - index_vec_vec.push(index_vec); + field_indexs.push(field_index); } else { return Err(Error::new( ErrorKind::DataInvalid, @@ -160,34 +163,12 @@ impl ArrowFieldProjector { } let delete_arrow_schema = Arc::new(Schema::new(fields)); Ok(Self { - index_vec_vec, + field_indices: field_indexs, projected_schema: delete_arrow_schema, }) } - /// Return the reference of projected schema - pub fn projected_schema_ref(&self) -> &SchemaRef { - &self.projected_schema - } - - /// Do projection with record batch - pub fn project_bacth(&self, batch: RecordBatch) -> Result { - RecordBatch::try_new( - self.projected_schema.clone(), - self.project_column(batch.columns())?, - ) - .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) - } - - /// Do projection with columns - pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { - self.index_vec_vec - .iter() - .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) - .collect::>>() - } - - fn fetch_column_index( + fn fetch_field_index( fields: &Fields, index_vec: &mut Vec, col_id: i64, @@ -216,7 +197,7 @@ impl ArrowFieldProjector { } if let DataType::Struct(inner) = field.data_type() { let res = - Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key); + Self::fetch_field_index(inner, index_vec, col_id, column_id_meta_key); if !index_vec.is_empty() { index_vec.push(pos); return res; @@ -231,8 +212,30 @@ impl ArrowFieldProjector { )) } - fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> Result { - let mut rev_iterator = index_vec.iter().rev(); + /// Return the reference of projected schema + pub fn projected_schema_ref(&self) -> &SchemaRef { + &self.projected_schema + } + + /// Do projection with record batch + pub fn project_bacth(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.projected_schema.clone(), + self.project_column(batch.columns())?, + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } + + /// Do projection with columns + pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { + self.field_indices + .iter() + .map(|index_vec| Self::get_column_by_field_index(batch, index_vec)) + .collect::>>() + } + + fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { + let mut rev_iterator = field_index.iter().rev(); let mut array = batch[*rev_iterator.next().unwrap()].clone(); for idx in rev_iterator { array = array @@ -467,8 +470,7 @@ mod test { let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); let equality_ids = vec![1, 3]; - let projector = - ArrowFieldProjector::new(arrow_schema, &equality_ids, PARQUET_FIELD_ID_META_KEY)?; + let projector = ArrowFieldProjector::new(arrow_schema, &equality_ids)?; let delete_schema = arrow_schema_to_schema(projector.projected_schema_ref()).unwrap(); // prepare writer @@ -524,19 +526,11 @@ mod test { }; let equality_id_float = vec![0]; - let result_float = ArrowFieldProjector::new( - schema.clone(), - &equality_id_float, - PARQUET_FIELD_ID_META_KEY, - ); + let result_float = ArrowFieldProjector::new(schema.clone(), &equality_id_float); assert!(result_float.is_err()); let equality_ids_double = vec![1]; - let result_double = ArrowFieldProjector::new( - schema.clone(), - &equality_ids_double, - PARQUET_FIELD_ID_META_KEY, - ); + let result_double = ArrowFieldProjector::new(schema.clone(), &equality_ids_double); assert!(result_double.is_err()); Ok(()) From ea606b6c64324a0a728014f064a59bf9f91cdf1d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 25 Nov 2024 19:46:22 +0800 Subject: [PATCH 4/7] refine code --- crates/iceberg/src/arrow/mod.rs | 1 + .../src/arrow/record_batch_projector.rs | 261 ++++++++++++++++++ .../base_writer/equality_delete_writer.rs | 191 +++---------- 3 files changed, 299 insertions(+), 154 deletions(-) create mode 100644 crates/iceberg/src/arrow/record_batch_projector.rs diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 31a892fa82..0f01324cb8 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -20,6 +20,7 @@ mod schema; pub use schema::*; mod reader; +pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; pub use reader::*; diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs new file mode 100644 index 0000000000..40da4c7d48 --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; + +use crate::error::Result; +use crate::{Error, ErrorKind}; + +/// Help to project specific field from `RecordBatch`` according to the fields id of meta of field. +#[derive(Clone)] +pub struct RecordBatchProjector { + // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. + // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, + // while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column). + field_indices: Vec>, + // The schema reference after projection. This schema is derived from the original schema based on the given field IDs. + projected_schema: SchemaRef, +} + +impl RecordBatchProjector { + /// Init ArrowFieldProjector + pub fn new( + original_schema: SchemaRef, + field_ids: &[i32], + field_id_fetch_func: F, + ) -> Result + where + F: Fn(&Field) -> Option, + { + let mut field_indices = Vec::with_capacity(field_ids.len()); + let mut fields = Vec::with_capacity(field_ids.len()); + for &id in field_ids { + let mut field_index = vec![]; + if let Ok(field) = Self::fetch_field_index( + original_schema.fields(), + &mut field_index, + id as i64, + &field_id_fetch_func, + ) { + fields.push(field.clone()); + field_indices.push(field_index); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Can't find source column id or column data type invalid: {}", + id + ), + )); + } + } + let delete_arrow_schema = Arc::new(Schema::new(fields)); + Ok(Self { + field_indices, + projected_schema: delete_arrow_schema, + }) + } + + fn fetch_field_index( + fields: &Fields, + index_vec: &mut Vec, + target_field_id: i64, + field_id_fetch_func: &F, + ) -> Result + where + F: Fn(&Field) -> Option, + { + for (pos, field) in fields.iter().enumerate() { + match field.data_type() { + DataType::Float16 | DataType::Float32 | DataType::Float64 => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete column data type cannot be float or double", + )); + } + _ => { + let id = field_id_fetch_func(field).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "column_id must be parsable as i64") + })?; + if target_field_id == id { + index_vec.push(pos); + return Ok(field.clone()); + } + if let DataType::Struct(inner) = field.data_type() { + let res = Self::fetch_field_index( + inner, + index_vec, + target_field_id, + field_id_fetch_func, + ); + if !index_vec.is_empty() { + index_vec.push(pos); + return res; + } + } + } + } + } + Err(Error::new( + ErrorKind::DataInvalid, + "Column id not found in fields", + )) + } + + /// Return the reference of projected schema + pub fn projected_schema_ref(&self) -> &SchemaRef { + &self.projected_schema + } + + /// Do projection with record batch + pub fn project_bacth(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.projected_schema.clone(), + self.project_column(batch.columns())?, + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } + + /// Do projection with columns + pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { + self.field_indices + .iter() + .map(|index_vec| Self::get_column_by_field_index(batch, index_vec)) + .collect::>>() + } + + fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { + let mut rev_iterator = field_index.iter().rev(); + let mut array = batch[*rev_iterator.next().unwrap()].clone(); + for idx in rev_iterator { + array = array + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "Cannot convert Array to StructArray", + ))? + .column(*idx) + .clone(); + } + Ok(array) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Fields, Schema}; + + use crate::arrow::record_batch_projector::RecordBatchProjector; + + #[test] + fn test_record_batch_projector_nested_level() { + let inner_fields = vec![ + Field::new("inner_field1", DataType::Int32, false), + Field::new("inner_field2", DataType::Utf8, false), + ]; + let fields = vec![ + Field::new("field1", DataType::Int32, false), + Field::new( + "field2", + DataType::Struct(Fields::from(inner_fields.clone())), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let field_id_fetch_func = |field: &Field| match field.name().as_str() { + "field1" => Some(1), + "field2" => Some(2), + "inner_field1" => Some(3), + "inner_field2" => Some(4), + _ => None, + }; + let projector = + RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func).unwrap(); + + assert!(projector.field_indices.len() == 2); + assert_eq!(projector.field_indices[0], vec![0]); + assert_eq!(projector.field_indices[1], vec![0, 1]); + + let int_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let inner_int_array = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef; + let inner_string_array = Arc::new(StringArray::from(vec!["x", "y", "z"])) as ArrayRef; + let struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new(inner_fields[0].clone()), + inner_int_array as ArrayRef, + ), + ( + Arc::new(inner_fields[1].clone()), + inner_string_array as ArrayRef, + ), + ])) as ArrayRef; + let batch = RecordBatch::try_new(schema, vec![int_array, struct_array]).unwrap(); + + let projected_batch = projector.project_bacth(batch).unwrap(); + assert_eq!(projected_batch.num_columns(), 2); + let projected_int_array = projected_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let projected_inner_int_array = projected_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(projected_int_array.values(), &[1, 2, 3]); + assert_eq!(projected_inner_int_array.values(), &[4, 5, 6]); + } + + #[test] + fn test_field_not_found() { + let inner_fields = vec![ + Field::new("inner_field1", DataType::Int32, false), + Field::new("inner_field2", DataType::Utf8, false), + ]; + + let fields = vec![ + Field::new("field1", DataType::Int32, false), + Field::new( + "field2", + DataType::Struct(Fields::from(inner_fields.clone())), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let field_id_fetch_func = |field: &Field| match field.name().as_str() { + "field1" => Some(1), + "field2" => Some(2), + "inner_field1" => Some(3), + "inner_field2" => Some(4), + _ => None, + }; + let projector = RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func); + + assert!(projector.is_err()); + } +} diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index e03ac8b2a7..82ead5544d 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -17,13 +17,12 @@ //! This module provide `EqualityDeleteWriter`. -use std::sync::Arc; - -use arrow_array::{ArrayRef, RecordBatch, StructArray}; -use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; use itertools::Itertools; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use crate::arrow::record_batch_projector::RecordBatchProjector; use crate::spec::{DataFile, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; @@ -45,24 +44,35 @@ impl EqualityDeleteFileWriterBuilder { /// Config for `EqualityDeleteWriter`. pub struct EqualityDeleteWriterConfig { // Field ids used to determine row equality in equality delete files. - equality_ids: Vec, + equality_ids: Vec, // Projector used to project the data chunk into specific fields. - projector: ArrowFieldProjector, + projector: RecordBatchProjector, partition_value: Struct, } impl EqualityDeleteWriterConfig { /// Create a new `DataFileWriterConfig` with equality ids. pub fn new( - equality_ids: Vec, - projector: ArrowFieldProjector, + equality_ids: Vec, + original_schema: SchemaRef, partition_value: Option, - ) -> Self { - Self { + ) -> Result { + let projector = RecordBatchProjector::new(original_schema, &equality_ids, |field| { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|value| value.parse::().ok()) + })?; + Ok(Self { equality_ids, projector, partition_value: partition_value.unwrap_or(Struct::empty()), - } + }) + } + + /// Return projected Schema + pub fn projected_schema_ref(&self) -> &SchemaRef { + self.projector.projected_schema_ref() } } @@ -84,8 +94,8 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil /// Writer used to write equality delete files. pub struct EqualityDeleteFileWriter { inner_writer: Option, - projector: ArrowFieldProjector, - equality_ids: Vec, + projector: RecordBatchProjector, + equality_ids: Vec, partition_value: Struct, } @@ -98,7 +108,7 @@ impl IcebergWriter for EqualityDeleteFileWriter { } else { Err(Error::new( ErrorKind::Unexpected, - "Equality delete inner writer does not exist", + "Equality delete inner writer has been closed.", )) } } @@ -111,7 +121,7 @@ impl IcebergWriter for EqualityDeleteFileWriter { .into_iter() .map(|mut res| { res.content(crate::spec::DataContentType::EqualityDeletes); - res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); + res.equality_ids(self.equality_ids.iter().copied().collect_vec()); res.partition(self.partition_value.clone()); res.build().expect("msg") }) @@ -119,139 +129,12 @@ impl IcebergWriter for EqualityDeleteFileWriter { } else { Err(Error::new( ErrorKind::Unexpected, - "Equality delete inner writer does not exist", + "Equality delete inner writer has been closed.", )) } } } -/// Help to project specific field from `RecordBatch`` according to the fields id. -#[derive(Clone)] -pub struct ArrowFieldProjector { - // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. - // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, - // while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column). - field_indices: Vec>, - // The schema reference after projection. This schema is derived from the original schema based on the given field IDs. - projected_schema: SchemaRef, -} - -impl ArrowFieldProjector { - /// Init ArrowFieldProjector - pub fn new(original_schema: SchemaRef, field_ids: &[usize]) -> Result { - let mut field_indexs = Vec::with_capacity(field_ids.len()); - let mut fields = Vec::with_capacity(field_ids.len()); - for &id in field_ids { - let mut field_index = vec![]; - if let Ok(field) = Self::fetch_field_index( - original_schema.fields(), - &mut field_index, - id as i64, - PARQUET_FIELD_ID_META_KEY, - ) { - fields.push(field.clone()); - field_indexs.push(field_index); - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Can't find source column id or column data type invalid: {}", - id - ), - )); - } - } - let delete_arrow_schema = Arc::new(Schema::new(fields)); - Ok(Self { - field_indices: field_indexs, - projected_schema: delete_arrow_schema, - }) - } - - fn fetch_field_index( - fields: &Fields, - index_vec: &mut Vec, - col_id: i64, - column_id_meta_key: &str, - ) -> Result { - for (pos, field) in fields.iter().enumerate() { - match field.data_type() { - DataType::Float16 | DataType::Float32 | DataType::Float64 => { - return Err(Error::new( - ErrorKind::DataInvalid, - "Delete column data type cannot be float or double", - )); - } - _ => { - let id: i64 = field - .metadata() - .get(column_id_meta_key) - .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "column_id must be set"))? - .parse::() - .map_err(|_| { - Error::new(ErrorKind::DataInvalid, "column_id must be parsable as i64") - })?; - if col_id == id { - index_vec.push(pos); - return Ok(field.clone()); - } - if let DataType::Struct(inner) = field.data_type() { - let res = - Self::fetch_field_index(inner, index_vec, col_id, column_id_meta_key); - if !index_vec.is_empty() { - index_vec.push(pos); - return res; - } - } - } - } - } - Err(Error::new( - ErrorKind::DataInvalid, - "Column id not found in fields", - )) - } - - /// Return the reference of projected schema - pub fn projected_schema_ref(&self) -> &SchemaRef { - &self.projected_schema - } - - /// Do projection with record batch - pub fn project_bacth(&self, batch: RecordBatch) -> Result { - RecordBatch::try_new( - self.projected_schema.clone(), - self.project_column(batch.columns())?, - ) - .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) - } - - /// Do projection with columns - pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { - self.field_indices - .iter() - .map(|index_vec| Self::get_column_by_field_index(batch, index_vec)) - .collect::>>() - } - - fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { - let mut rev_iterator = field_index.iter().rev(); - let mut array = batch[*rev_iterator.next().unwrap()].clone(); - for idx in rev_iterator { - array = array - .as_any() - .downcast_ref::() - .ok_or(Error::new( - ErrorKind::Unexpected, - "Cannot convert Array to StructArray", - ))? - .column(*idx) - .clone(); - } - Ok(array) - } -} - #[cfg(test)] mod test { use std::collections::HashMap; @@ -273,7 +156,7 @@ mod test { DataFile, DataFileFormat, ListType, NestedField, PrimitiveType, Schema, StructType, Type, }; use crate::writer::base_writer::equality_delete_writer::{ - ArrowFieldProjector, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, }; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; @@ -469,9 +352,11 @@ mod test { let columns = vec![col0, col1, col2, col3, col4]; let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); - let equality_ids = vec![1, 3]; - let projector = ArrowFieldProjector::new(arrow_schema, &equality_ids)?; - let delete_schema = arrow_schema_to_schema(projector.projected_schema_ref()).unwrap(); + let equality_ids = vec![1_i32, 3]; + let equality_config = + EqualityDeleteWriterConfig::new(equality_ids, arrow_schema, None).unwrap(); + let delete_schema = arrow_schema_to_schema(equality_config.projected_schema_ref()).unwrap(); + let projector = equality_config.projector.clone(); // prepare writer let pb = ParquetWriterBuilder::new( @@ -483,12 +368,9 @@ mod test { ); let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) - .build(EqualityDeleteWriterConfig::new( - equality_ids, - projector.clone(), - None, - )) + .build(equality_config) .await?; + // write equality_delete_writer.write(to_write.clone()).await?; let res = equality_delete_writer.close().await?; @@ -526,11 +408,12 @@ mod test { }; let equality_id_float = vec![0]; - let result_float = ArrowFieldProjector::new(schema.clone(), &equality_id_float); + let result_float = EqualityDeleteWriterConfig::new(equality_id_float, schema.clone(), None); assert!(result_float.is_err()); let equality_ids_double = vec![1]; - let result_double = ArrowFieldProjector::new(schema.clone(), &equality_ids_double); + let result_double = + EqualityDeleteWriterConfig::new(equality_ids_double, schema.clone(), None); assert!(result_double.is_err()); Ok(()) From 4acbb92ce6109c26ae76eda71e4697b7cbace10f Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 26 Nov 2024 21:36:16 +0800 Subject: [PATCH 5/7] refine RecordBatchProjector --- .../src/arrow/record_batch_projector.rs | 157 ++++++++++------- .../base_writer/equality_delete_writer.rs | 161 +++++++++++++----- 2 files changed, 213 insertions(+), 105 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index 40da4c7d48..745a3005a9 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -23,9 +23,9 @@ use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use crate::error::Result; use crate::{Error, ErrorKind}; -/// Help to project specific field from `RecordBatch`` according to the fields id of meta of field. +/// Help to project specific field from `RecordBatch`` according to the fields id. #[derive(Clone)] -pub struct RecordBatchProjector { +pub(crate) struct RecordBatchProjector { // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, // while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column). @@ -36,35 +36,35 @@ pub struct RecordBatchProjector { impl RecordBatchProjector { /// Init ArrowFieldProjector - pub fn new( + /// + /// This function will iterate through the field and fetch the field from the original schema according to the field ids. + /// The function to fetch the field id from the field is provided by `field_id_fetch_func`, return None if the field need to be skipped. + /// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether + /// iterate into the nested fields. + pub(crate) fn new( original_schema: SchemaRef, field_ids: &[i32], - field_id_fetch_func: F, + field_id_fetch_func: F1, + searchable_field_func: F2, ) -> Result where - F: Fn(&Field) -> Option, + F1: Fn(&Field) -> Result>, + F2: Fn(&Field) -> bool, { let mut field_indices = Vec::with_capacity(field_ids.len()); let mut fields = Vec::with_capacity(field_ids.len()); for &id in field_ids { let mut field_index = vec![]; - if let Ok(field) = Self::fetch_field_index( + let field = Self::fetch_field_index( original_schema.fields(), &mut field_index, id as i64, &field_id_fetch_func, - ) { - fields.push(field.clone()); - field_indices.push(field_index); - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Can't find source column id or column data type invalid: {}", - id - ), - )); - } + &searchable_field_func, + )? + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field not found"))?; + fields.push(field.clone()); + field_indices.push(field_index); } let delete_arrow_schema = Arc::new(Schema::new(fields)); Ok(Self { @@ -73,59 +73,50 @@ impl RecordBatchProjector { }) } - fn fetch_field_index( + fn fetch_field_index( fields: &Fields, index_vec: &mut Vec, target_field_id: i64, - field_id_fetch_func: &F, - ) -> Result + field_id_fetch_func: &F1, + searchable_field_func: &F2, + ) -> Result> where - F: Fn(&Field) -> Option, + F1: Fn(&Field) -> Result>, + F2: Fn(&Field) -> bool, { for (pos, field) in fields.iter().enumerate() { - match field.data_type() { - DataType::Float16 | DataType::Float32 | DataType::Float64 => { - return Err(Error::new( - ErrorKind::DataInvalid, - "Delete column data type cannot be float or double", - )); + let id = field_id_fetch_func(field)?; + if let Some(id) = id { + if target_field_id == id { + index_vec.push(pos); + return Ok(Some(field.clone())); } - _ => { - let id = field_id_fetch_func(field).ok_or_else(|| { - Error::new(ErrorKind::DataInvalid, "column_id must be parsable as i64") - })?; - if target_field_id == id { + } + if let DataType::Struct(inner) = field.data_type() { + if searchable_field_func(field) { + if let Some(res) = Self::fetch_field_index( + inner, + index_vec, + target_field_id, + field_id_fetch_func, + searchable_field_func, + )? { index_vec.push(pos); - return Ok(field.clone()); - } - if let DataType::Struct(inner) = field.data_type() { - let res = Self::fetch_field_index( - inner, - index_vec, - target_field_id, - field_id_fetch_func, - ); - if !index_vec.is_empty() { - index_vec.push(pos); - return res; - } + return Ok(Some(res)); } } } } - Err(Error::new( - ErrorKind::DataInvalid, - "Column id not found in fields", - )) + Ok(None) } /// Return the reference of projected schema - pub fn projected_schema_ref(&self) -> &SchemaRef { + pub(crate) fn projected_schema_ref(&self) -> &SchemaRef { &self.projected_schema } /// Do projection with record batch - pub fn project_bacth(&self, batch: RecordBatch) -> Result { + pub(crate) fn project_bacth(&self, batch: RecordBatch) -> Result { RecordBatch::try_new( self.projected_schema.clone(), self.project_column(batch.columns())?, @@ -134,7 +125,7 @@ impl RecordBatchProjector { } /// Do projection with columns - pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { + pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result> { self.field_indices .iter() .map(|index_vec| Self::get_column_by_field_index(batch, index_vec)) @@ -167,6 +158,7 @@ mod test { use arrow_schema::{DataType, Field, Fields, Schema}; use crate::arrow::record_batch_projector::RecordBatchProjector; + use crate::{Error, ErrorKind}; #[test] fn test_record_batch_projector_nested_level() { @@ -185,14 +177,15 @@ mod test { let schema = Arc::new(Schema::new(fields)); let field_id_fetch_func = |field: &Field| match field.name().as_str() { - "field1" => Some(1), - "field2" => Some(2), - "inner_field1" => Some(3), - "inner_field2" => Some(4), - _ => None, + "field1" => Ok(Some(1)), + "field2" => Ok(Some(2)), + "inner_field1" => Ok(Some(3)), + "inner_field2" => Ok(Some(4)), + _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; let projector = - RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func).unwrap(); + RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true) + .unwrap(); assert!(projector.field_indices.len() == 2); assert_eq!(projector.field_indices[0], vec![0]); @@ -248,14 +241,48 @@ mod test { let schema = Arc::new(Schema::new(fields)); let field_id_fetch_func = |field: &Field| match field.name().as_str() { - "field1" => Some(1), - "field2" => Some(2), - "inner_field1" => Some(3), - "inner_field2" => Some(4), - _ => None, + "field1" => Ok(Some(1)), + "field2" => Ok(Some(2)), + "inner_field1" => Ok(Some(3)), + "inner_field2" => Ok(Some(4)), + _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func); + let projector = + RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true); + + assert!(projector.is_err()); + } + + #[test] + fn test_field_not_reachable() { + let inner_fields = vec![ + Field::new("inner_field1", DataType::Int32, false), + Field::new("inner_field2", DataType::Utf8, false), + ]; + + let fields = vec![ + Field::new("field1", DataType::Int32, false), + Field::new( + "field2", + DataType::Struct(Fields::from(inner_fields.clone())), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + let field_id_fetch_func = |field: &Field| match field.name().as_str() { + "field1" => Ok(Some(1)), + "field2" => Ok(Some(2)), + "inner_field1" => Ok(Some(3)), + "inner_field2" => Ok(Some(4)), + _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), + }; + let projector = + RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false); assert!(projector.is_err()); + + let projector = + RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); + assert!(projector.is_ok()); } } diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 82ead5544d..02ef9727ce 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -17,13 +17,16 @@ //! This module provide `EqualityDeleteWriter`. +use std::sync::Arc; + use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef}; use itertools::Itertools; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_projector::RecordBatchProjector; -use crate::spec::{DataFile, Struct}; +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; @@ -57,12 +60,38 @@ impl EqualityDeleteWriterConfig { original_schema: SchemaRef, partition_value: Option, ) -> Result { - let projector = RecordBatchProjector::new(original_schema, &equality_ids, |field| { - field - .metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .and_then(|value| value.parse::().ok()) - })?; + let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); + let projector = RecordBatchProjector::new( + original_arrow_schema, + &equality_ids, + // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids + // - The identifier field ids must be used for primitive types. + // - The identifier field ids must not be used for floating point types or nullable fields. + // - The identifier field ids can be nested field of struct but not nested field of nullable struct. + |field| { + // Only primitive type is allowed to be used for identifier field ids + if field.is_nullable() + || !field.data_type().is_primitive() + || matches!( + field.data_type(), + DataType::Float16 | DataType::Float32 | DataType::Float64 + ) + { + return Ok(None); + } + Ok(Some( + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Field metadata is missing.") + })? + .parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?, + )) + }, + |field: &Field| !field.is_nullable(), + )?; Ok(Self { equality_ids, projector, @@ -71,7 +100,7 @@ impl EqualityDeleteWriterConfig { } /// Return projected Schema - pub fn projected_schema_ref(&self) -> &SchemaRef { + pub fn projected_schema_ref(&self) -> &ArrowSchemaRef { self.projector.projected_schema_ref() } } @@ -137,7 +166,6 @@ impl IcebergWriter for EqualityDeleteFileWriter { #[cfg(test)] mod test { - use std::collections::HashMap; use std::sync::Arc; use arrow_array::types::Int32Type; @@ -146,14 +174,14 @@ mod test { use arrow_select::concat::concat_batches; use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; use tempfile::TempDir; use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{ - DataFile, DataFileFormat, ListType, NestedField, PrimitiveType, Schema, StructType, Type, + DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema, + StructType, Type, }; use crate::writer::base_writer::equality_delete_writer::{ EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, @@ -352,9 +380,9 @@ mod test { let columns = vec![col0, col1, col2, col3, col4]; let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); - let equality_ids = vec![1_i32, 3]; + let equality_ids = vec![0_i32, 8]; let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, arrow_schema, None).unwrap(); + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); let delete_schema = arrow_schema_to_schema(equality_config.projected_schema_ref()).unwrap(); let projector = equality_config.projector.clone(); @@ -389,32 +417,85 @@ mod test { } #[tokio::test] - async fn test_equality_delete_float_or_double_column() -> Result<(), anyhow::Error> { - // Float32, Float64 - let schema = { - let fields = vec![ - arrow_schema::Field::new("col0", arrow_schema::DataType::Float32, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "0".to_string(), - )])), - arrow_schema::Field::new("col1", arrow_schema::DataType::Float64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - ]; - Arc::new(arrow_schema::Schema::new(fields)) - }; - - let equality_id_float = vec![0]; - let result_float = EqualityDeleteWriterConfig::new(equality_id_float, schema.clone(), None); - assert!(result_float.is_err()); - - let equality_ids_double = vec![1]; - let result_double = - EqualityDeleteWriterConfig::new(equality_ids_double, schema.clone(), None); - assert!(result_double.is_err()); + async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Float)).into(), + NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Double)).into(), + NestedField::optional(2, "col2", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 3, + "col3", + Type::Struct(StructType::new(vec![NestedField::required( + 4, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + NestedField::optional( + 5, + "col4", + Type::Struct(StructType::new(vec![NestedField::required( + 6, + "sub_col2", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + NestedField::required( + 7, + "col5", + Type::Map(MapType::new( + Arc::new(NestedField::required( + 8, + "key", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::required( + 9, + "value", + Type::Primitive(PrimitiveType::Int), + )), + )), + ) + .into(), + NestedField::required( + 10, + "col6", + Type::List(ListType::new(Arc::new(NestedField::required( + 11, + "element", + Type::Primitive(PrimitiveType::Int), + )))), + ) + .into(), + ]) + .build() + .unwrap(), + ); + // Float and Double are not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); + // Int is nullable, not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err()); + // Struct is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); + // Nested field of struct is allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); + // Nested field of optional struct is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err()); + // Nested field of map is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err()); + // Nested field of list is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err()); Ok(()) } From 11bb9b022712e93c395d42ec51278d84f6a3ae90 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 27 Nov 2024 19:44:17 +0800 Subject: [PATCH 6/7] refine error --- crates/iceberg/src/arrow/record_batch_projector.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index 745a3005a9..f218983aa3 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -62,7 +62,10 @@ impl RecordBatchProjector { &field_id_fetch_func, &searchable_field_func, )? - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field not found"))?; + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Field not found") + .with_context("field_id", id.to_string()) + })?; fields.push(field.clone()); field_indices.push(field_index); } From a5fab3a0130679456de53e62698ee4c4ca528dd8 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 28 Nov 2024 11:34:15 +0800 Subject: [PATCH 7/7] refine function name --- .../iceberg/src/writer/base_writer/equality_delete_writer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 02ef9727ce..222961fc49 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -100,7 +100,7 @@ impl EqualityDeleteWriterConfig { } /// Return projected Schema - pub fn projected_schema_ref(&self) -> &ArrowSchemaRef { + pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef { self.projector.projected_schema_ref() } } @@ -383,7 +383,8 @@ mod test { let equality_ids = vec![0_i32, 8]; let equality_config = EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); - let delete_schema = arrow_schema_to_schema(equality_config.projected_schema_ref()).unwrap(); + let delete_schema = + arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); let projector = equality_config.projector.clone(); // prepare writer