diff --git a/Cargo.lock b/Cargo.lock index 31e774c1d2..13bff64bad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2943,6 +2943,7 @@ dependencies = [ "arrow-buffer", "arrow-cast", "arrow-ord", + "arrow-row", "arrow-schema", "arrow-select", "arrow-string", diff --git a/Cargo.toml b/Cargo.toml index 8d093a27b7..fb12019436 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ arrow-ord = { version = "54.1.0" } arrow-schema = { version = "54.1.0" } arrow-select = { version = "54.1.0" } arrow-string = { version = "54.1.0" } +arrow-row = { version = "54.1.0" } async-stream = "0.3.5" async-trait = "0.1.86" async-std = "1.12" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 7320c455d1..931e77d155 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -49,6 +49,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 0c885e65f4..c4d96d59d5 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -25,3 +25,5 @@ pub(crate) mod record_batch_transformer; mod value; pub use reader::*; pub use value::*; +mod record_batch_partition_spliter; +pub(crate) use record_batch_partition_spliter::*; diff --git a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs new file mode 100644 index 0000000000..e9816d89a2 --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs @@ -0,0 +1,478 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, Schema as ArrowSchema}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use super::record_batch_projector::RecordBatchProjector; +use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type}; +use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type}; +use crate::transform::{create_transform_function, BoxedTransformFunction}; +use crate::{Error, ErrorKind, Result}; + +/// A helper function to split the record batch into multiple record batches using computed partition columns. +pub(crate) fn split_with_partition( + row_converter: &RowConverter, + partition_columns: &[ArrayRef], + batch: &RecordBatch, +) -> Result> { + let rows = row_converter + .convert_columns(partition_columns) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{}", e)))?; + + // Group the batch by row value. + let mut group_ids = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + group_ids.entry(row.owned()).or_insert(vec![]).push(row_id); + }); + + // Partition the batch with same partition partition_values + let mut partition_batches = Vec::with_capacity(group_ids.len()); + for (row, row_ids) in group_ids.into_iter() { + // generate the bool filter array from column_ids + let filter_array: BooleanArray = { + let mut filter = vec![false; batch.num_rows()]; + row_ids.into_iter().for_each(|row_id| { + filter[row_id] = true; + }); + filter.into() + }; + + // filter the RecordBatch + partition_batches.push(( + row, + filter_record_batch(batch, &filter_array) + .expect("We should guarantee the filter array is valid"), + )); + } + + Ok(partition_batches) +} + +pub(crate) fn convert_row_to_struct( + row_converter: &RowConverter, + struct_type: &StructType, + rows: Vec, +) -> Result> { + let arrow_struct_array = { + let partition_columns = row_converter + .convert_rows(rows.iter().map(|row| row.row())) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{e}")))?; + let partition_arrow_fields = { + let partition_arrow_type = type_to_arrow_type(&Type::Struct(struct_type.clone()))?; + let DataType::Struct(fields) = partition_arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition arrow type is not a struct type", + )); + }; + fields + }; + Arc::new(StructArray::try_new( + partition_arrow_fields, + partition_columns, + None, + )?) as ArrayRef + }; + let struct_array = { + let struct_array = arrow_struct_to_literal(&arrow_struct_array, struct_type)?; + struct_array + .into_iter() + .map(|s| { + if let Some(s) = s { + if let Literal::Struct(s) = s { + Ok(s) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "The struct is not a struct literal", + )) + } + } else { + Err(Error::new(ErrorKind::DataInvalid, "The struct is null")) + } + }) + .collect::>>()? + }; + + Ok(struct_array) +} + +/// The spliter used to split the record batch into multiple record batches by the partition spec. +pub(crate) struct RecordBatchPartitionSpliter { + partition_spec: PartitionSpecRef, + schema: SchemaRef, + projector: RecordBatchProjector, + transform_functions: Vec, + row_converter: RowConverter, +} + +impl RecordBatchPartitionSpliter { + pub(crate) fn new( + arrow_schema: &ArrowSchema, + table_schema: SchemaRef, + partition_spec: PartitionSpecRef, + ) -> Result { + if partition_spec.fields().is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Fail to create partition spliter using empty partition spec", + )); + } + let projector = RecordBatchProjector::new( + arrow_schema, + &partition_spec + .fields() + .iter() + .map(|field| field.source_id) + .collect::>(), + // The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct. + // ref: https://iceberg.apache.org/spec/#partitioning + |field| { + if field.data_type().is_nested() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let transform_functions = partition_spec + .fields() + .iter() + .map(|field| create_transform_function(&field.transform)) + .collect::>>()?; + let row_converter = RowConverter::new( + partition_spec + .partition_type(&table_schema)? + .fields() + .iter() + .map(|f| Ok(SortField::new(type_to_arrow_type(&f.field_type)?))) + .collect::>>()?, + )?; + Ok(Self { + partition_spec, + schema: table_schema, + projector, + transform_functions, + row_converter, + }) + } + + /// Split the record batch into multiple record batches by the partition spec. + pub(crate) fn split(&self, batch: &RecordBatch) -> Result> { + // get array using partition spec + let source_columns = self.projector.project_column(batch.columns())?; + let partition_columns = source_columns + .into_iter() + .zip_eq(self.transform_functions.iter()) + .map(|(source_column, transform_function)| transform_function.transform(source_column)) + .collect::>>()?; + + split_with_partition(&self.row_converter, &partition_columns, batch) + } + + pub(crate) fn convert_row(&self, rows: Vec) -> Result> { + convert_row_to_struct( + &self.row_converter, + &self.partition_spec.partition_type(&self.schema)?, + rows, + ) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + use super::*; + use crate::arrow::schema_to_arrow_schema; + use crate::spec::{NestedField, PartitionSpec, Schema, Transform, UnboundPartitionField}; + + #[test] + fn test_record_batch_partition_spliter() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let partition_spliter = RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(schema), + Arc::new(partition_spec), + ) + .expect("Failed to create spliter"); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("data", DataType::Utf8, false), + ])); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_record_batch_partition_spliter_with_extra_columns() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("extra_column1", DataType::Utf8, true), + Field::new("id", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("extra_column2", DataType::Utf8, true), + ])); + let extra_column1_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let extra_column2_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(extra_column1_array), + Arc::new(id_array), + Arc::new(data_array), + Arc::new(extra_column2_array), + ]) + .expect("Failed to create RecordBatch"); + let partition_spliter = RecordBatchPartitionSpliter::new( + &arrow_schema, + Arc::new(schema), + Arc::new(partition_spec), + ) + .expect("Failed to create spliter"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_extra_column1_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_extra_column2_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_extra_column1_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_extra_column2_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_extra_column1_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_extra_column2_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_empty_partition() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .build() + .unwrap(); + assert!(RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(schema), + Arc::new(partition_spec), + ) + .is_err()) + } +} diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index 878d0fe28e..f27f8dcab0 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -43,7 +43,7 @@ impl RecordBatchProjector { /// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether /// iterate into the nested fields. pub(crate) fn new( - original_schema: SchemaRef, + original_schema: &Schema, field_ids: &[i32], field_id_fetch_func: F1, searchable_field_func: F2, @@ -192,8 +192,7 @@ mod test { _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; let projector = - RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true) - .unwrap(); + RecordBatchProjector::new(&schema, &[1, 3], field_id_fetch_func, |_| true).unwrap(); assert_eq!(projector.field_indices.len(), 2); assert_eq!(projector.field_indices[0], vec![0]); @@ -255,8 +254,7 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[1, 5], field_id_fetch_func, |_| true); assert!(projector.is_err()); } @@ -285,12 +283,10 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| false); assert!(projector.is_err()); - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| true); assert!(projector.is_ok()); } } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index c82f353fe1..e1d37f3213 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1441,6 +1441,10 @@ impl DataFile { pub fn sort_order_id(&self) -> Option { self.sort_order_id } + + pub(crate) fn rewrite_partition(&mut self, partition: Struct) { + self.partition = partition; + } } /// Convert data files to avro bytes and write to writer. diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index fb9682573b..e5c5783d51 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -64,7 +64,7 @@ impl EqualityDeleteWriterConfig { ) -> Result { let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); let projector = RecordBatchProjector::new( - original_arrow_schema, + &original_arrow_schema, &equality_ids, // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids // and https://iceberg.apache.org/spec/#equality-delete-files diff --git a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs new file mode 100644 index 0000000000..8309b859ff --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the fanout partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_row::OwnedRow; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use itertools::Itertools; + +use crate::arrow::{schema_to_arrow_schema, RecordBatchPartitionSpliter}; +use crate::spec::{DataFile, PartitionSpecRef, SchemaRef}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::Result; + +/// The builder for `FanoutPartitionWriter`. +#[derive(Clone)] +pub struct FanoutPartitionWriterBuilder { + inner_builder: B, + partition_specs: PartitionSpecRef, + table_schema: SchemaRef, + arrow_schema: ArrowSchemaRef, +} + +impl FanoutPartitionWriterBuilder { + /// Create a new `FanoutPartitionWriterBuilder` with the default arrow schema. + pub fn new( + inner_builder: B, + partition_specs: PartitionSpecRef, + table_schema: SchemaRef, + ) -> Result { + Ok(Self::new_with_custom_schema( + inner_builder, + Arc::new(schema_to_arrow_schema(&table_schema)?), + partition_specs, + table_schema, + )) + } + + /// Create a new `FanoutPartitionWriterBuilder` with a custom arrow schema. + /// This function is useful for the user who has the input with extral columns. + pub fn new_with_custom_schema( + inner_builder: B, + arrow_schema: ArrowSchemaRef, + partition_specs: PartitionSpecRef, + table_schema: SchemaRef, + ) -> Self { + Self { + inner_builder, + partition_specs, + table_schema, + arrow_schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for FanoutPartitionWriterBuilder { + type R = FanoutPartitionWriter; + + async fn build(self) -> Result { + let partition_splitter = RecordBatchPartitionSpliter::new( + &self.arrow_schema, + self.table_schema.clone(), + self.partition_specs, + )?; + Ok(FanoutPartitionWriter { + inner_writer_builder: self.inner_builder, + partition_splitter, + partition_writers: HashMap::new(), + }) + } +} + +/// The fanout partition writer. +/// It will split the input record batch by the partition specs, and write the splitted record batches to the inner writers. +pub struct FanoutPartitionWriter { + inner_writer_builder: B, + partition_splitter: RecordBatchPartitionSpliter, + partition_writers: HashMap, +} + +impl FanoutPartitionWriter { + /// Get the current number of partition writers. + pub fn partition_num(&self) -> usize { + self.partition_writers.len() + } +} + +#[async_trait::async_trait] +impl IcebergWriter for FanoutPartitionWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let splits = self.partition_splitter.split(&input)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = self.partition_splitter.convert_row(partition_rows)?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionField, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_fanout_partition_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(pw, None); + let mut fanout_partition_writer = FanoutPartitionWriterBuilder::new( + data_file_writer_builder, + Arc::new(partition_spec), + schema, + ) + .unwrap() + .build() + .await + .unwrap(); + + // prepare data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + fanout_partition_writer.write(batch).await?; + let data_files = fanout_partition_writer.close().await?; + assert_eq!(data_files.len(), 3); + let expected_partitions = vec![ + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(1)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(2)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(3)))]), + ]; + let expected_batches = vec![ + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 1, 1])), + Arc::new(StringArray::from(vec!["a", "c", "g"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(StringArray::from(vec!["b", "e"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3, 3])), + Arc::new(StringArray::from(vec!["d", "f"])), + ]) + .unwrap(), + ]; + for (partition, batch) in expected_partitions + .into_iter() + .zip_eq(expected_batches.into_iter()) + { + assert!(data_files.iter().any(|file| file.partition == partition)); + let data_file = data_files + .iter() + .find(|file| file.partition == partition) + .unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(batch, res); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs new file mode 100644 index 0000000000..4d608d6571 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the functional writer. + +pub mod fanout_partition_writer; +pub mod precompute_partition_writer; diff --git a/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs new file mode 100644 index 0000000000..7426d50d23 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the precompute partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use arrow_array::{RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::DataType; +use itertools::Itertools; + +use crate::arrow::{convert_row_to_struct, split_with_partition, type_to_arrow_type}; +use crate::spec::{DataFile, PartitionSpecRef, SchemaRef, Type}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// The builder for precompute partition writer. +#[derive(Clone)] +pub struct PrecomputePartitionWriterBuilder { + inner_writer_builder: B, + partition_spec: PartitionSpecRef, + schema: SchemaRef, +} + +impl PrecomputePartitionWriterBuilder { + /// Create a new precompute partition writer builder. + pub fn new( + inner_writer_builder: B, + partition_spec: PartitionSpecRef, + schema: SchemaRef, + ) -> Self { + Self { + inner_writer_builder, + partition_spec, + schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder<(StructArray, RecordBatch)> + for PrecomputePartitionWriterBuilder +{ + type R = PrecomputePartitionWriter; + + async fn build(self) -> Result { + let arrow_type = type_to_arrow_type(&Type::Struct( + self.partition_spec.partition_type(&self.schema)?, + ))?; + let DataType::Struct(fields) = &arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition type is not a struct", + )); + }; + let partition_row_converter = RowConverter::new( + fields + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + Ok(PrecomputePartitionWriter { + inner_writer_builder: self.inner_writer_builder, + partition_row_converter, + partition_spec: self.partition_spec, + partition_writers: HashMap::new(), + schema: self.schema, + }) + } +} + +/// The precompute partition writer. +pub struct PrecomputePartitionWriter { + inner_writer_builder: B, + partition_writers: HashMap, + partition_row_converter: RowConverter, + partition_spec: PartitionSpecRef, + schema: SchemaRef, +} + +#[async_trait::async_trait] +impl IcebergWriter<(StructArray, RecordBatch)> + for PrecomputePartitionWriter +{ + async fn write(&mut self, input: (StructArray, RecordBatch)) -> Result<()> { + let splits = + split_with_partition(&self.partition_row_converter, input.0.columns(), &input.1)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = convert_row_to_struct( + &self.partition_row_converter, + &self.partition_spec.partition_type(&self.schema)?, + partition_rows, + )?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionField, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::precompute_partition_writer::PrecomputePartitionWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_precompute_partition_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(pw, None); + let mut precompute_partition_writer = PrecomputePartitionWriterBuilder::new( + data_file_writer_builder, + Arc::new(partition_spec), + schema, + ) + .build() + .await + .unwrap(); + + // prepare data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + let id_bucket_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let partition_batch = StructArray::from(vec![( + Arc::new(Field::new("id_bucket", DataType::Int64, true)), + Arc::new(id_bucket_array) as ArrayRef, + )]); + + precompute_partition_writer + .write((partition_batch, batch)) + .await?; + let data_files = precompute_partition_writer.close().await?; + assert_eq!(data_files.len(), 3); + let expected_partitions = vec![ + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(1)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(2)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(3)))]), + ]; + let expected_batches = vec![ + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 1, 1])), + Arc::new(StringArray::from(vec!["a", "c", "g"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(StringArray::from(vec!["b", "e"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3, 3])), + Arc::new(StringArray::from(vec!["d", "f"])), + ]) + .unwrap(), + ]; + for (partition, batch) in expected_partitions + .into_iter() + .zip_eq(expected_batches.into_iter()) + { + assert!(data_files.iter().any(|file| file.partition == partition)); + let data_file = data_files + .iter() + .find(|file| file.partition == partition) + .unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(batch, res); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 64357a0fe2..57191545cc 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -47,6 +47,7 @@ pub mod base_writer; pub mod file_writer; +pub mod function_writer; use arrow_array::RecordBatch;