From 0efdffa070c3c5a8f5babddc862822dc2dd702e7 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 26 Dec 2023 12:23:34 +0800 Subject: [PATCH 1/4] init writer framework --- crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/writer/file_writer/mod.rs | 51 ++++++++ crates/iceberg/src/writer/mod.rs | 120 +++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 crates/iceberg/src/writer/file_writer/mod.rs create mode 100644 crates/iceberg/src/writer/mod.rs diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 7d652d8b09..9ceadcac87 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -51,3 +51,5 @@ mod scan; pub mod expr; pub mod transaction; pub mod transform; + +pub mod writer; diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs new file mode 100644 index 0000000000..70e6c15094 --- /dev/null +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg File Writer + +use super::{CurrentFileStatus, IcebergWriteResult}; +use crate::Result; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; + +/// File writer builder trait. +#[async_trait::async_trait] +pub trait FileWriterBuilder: Send + Clone + 'static { + /// The associated file writer type. + type R: FileWriter; + /// Build file writer. + async fn build(self, schema: &SchemaRef) -> Result; +} + +/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) +#[async_trait::async_trait] +pub trait FileWriter: Send + 'static + CurrentFileStatus { + /// The associated file write result type. + type R: FileWriteResult; + /// Write record batch to file. + async fn write(&mut self, batch: &RecordBatch) -> Result<()>; + /// Close file writer. + async fn close(self) -> Result>; +} + +/// File write result. +pub trait FileWriteResult: Send + 'static { + /// The associated iceberg write result type. + type R: IcebergWriteResult; + /// Convert to iceberg write result. + fn to_iceberg_result(self) -> Self::R; +} diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs new file mode 100644 index 0000000000..59be5d09bb --- /dev/null +++ b/crates/iceberg/src/writer/mod.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg writer module. +//! +//! The writer API is designed to be extensible and flexible. Each writer is decoupled and can be create and config independently. User can: +//! 1.Combine different writer builder to build a writer which have complex write logic. Such as FanoutPartition + DataFileWrite or FanoutPartition + PosititionDeleteFileWrite. +//! 2.Customize the writer and combine it with original writer builder to build a writer which +//! can process the data in a specific way. +//! +//! There are two kinds of writer and related builder: +//! 1. `IcebergWriter` and `IcebergWriterBuilder`, they are focus on the data process logical. +//! If you want to support a new data process logical, you need to implement a new `IcebergWriter` and `IcebergWriterBuilder`. +//! 2. `FileWriter` and `FileWriterBuilder`, they are focus on the physical file write. +//! If you want to support a new physical file format, you need to implement a new `FileWriter` and `FileWriterBuilder`. +//! +//! The create process of iceberg writer is: +//! 1. Create a `FileWriterBuilder`. +//! 1a. Combine it with other `FileWriterBuilder` to get a new `FileWriterBuilder`. +//! 2. Use FileWriterBuilder to create a `IcebergWriterBuilder`. +//! 2a. Combine it with other `IcebergWriterBuilder` to get a new `IcebergWriterBuilder`. +//! 3. Use `build` function in `IcebergWriterBuilder` to create a `IcebergWriter`. +//! +//! # Simple Case 1: Create a data file writer using parquet file format. +//! # TODO(Implement this example) +//! ``` +//! // 1. Create a parquet file writer builder. +//! let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config); +//! // 2. Create a data file writer builder. +//! let DataFileWriterBuilder = DataFileWriterBuilder::new(parquet_writer_builder,data_file_writer_config); +//! // 3. Create a iceberg writer. +//! let iceberg_writer = DataFileWriterBuilder.build(schema).await?; +//! +//! iceberg_writer.write(input).await?; +//! +//! let write_result = iceberg_writer.flush().await?; +//! ``` +//! +//! # Complex Case 2: Create a fanout partition data file writer using parquet file format. +//! # TODO (Implement this example) +//! ``` +//! // 1. Create a parquet file writer builder. +//! let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config); +//! // 2. Create a data file writer builder. +//! let DataFileWriterBuilder = DataFileWriterBuilder::new(parquet_writer_builder,data_file_writer_config); +//! // 3. Create a fanout partition writer builder. +//! let fanout_partition_writer_builder = FanoutPartitionWriterBuilder::new(DataFileWriterBuilder, partition_config); +//! // 4. Create a iceberg writer. +//! let iceberg_writer = fanout_partition_writer_builder.build(schema).await?; +//! +//! iceberg_writer.write(input).await?; +//! +//! let write_result = iceberg_writer.flush().await?; +//! ``` + +use crate::{ + spec::{DataContentType, Struct}, + Result, +}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; + +pub mod file_writer; + +type DefaultInput = RecordBatch; + +/// The builder for iceberg writer. +#[async_trait::async_trait] +pub trait IcebergWriterBuilder: Send + Clone + 'static { + /// The associated writer type. + type R: IcebergWriter; + /// Build the iceberg writer. + async fn build(self, schema: &SchemaRef) -> Result; +} + +/// The iceberg writer used to write data to iceberg table. +#[async_trait::async_trait] +pub trait IcebergWriter: Send + 'static { + /// The associated write result type. + type R: IcebergWriteResult; + /// Write data to iceberg table. + async fn write(&mut self, input: I) -> Result<()>; + /// Flush the writer and return the write result. + async fn flush(&mut self) -> Result>; +} + +/// The write result of iceberg writer. +pub trait IcebergWriteResult: Send + Sync + 'static { + /// Set the content type of the write result. + fn set_content(&mut self, content: DataContentType) -> &mut Self; + /// Set the equality ids of the write result. + fn set_equality_ids(&mut self, equality_ids: Vec) -> &mut Self; + /// Set the partition of the write result. + fn set_partition(&mut self, partition_value: Struct) -> &mut Self; +} + +/// The current file status of iceberg writer. It implement for the writer which write a single +/// file. +pub trait CurrentFileStatus { + /// Get the current file path. + fn current_file_path(&self) -> String; + /// Get the current file row number. + fn current_row_num(&self) -> usize; + /// Get the current file written size. + fn current_written_size(&self) -> usize; +} From a040551b56494c0463e7f22dfc6328839f835d17 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 8 Jan 2024 19:17:48 +0800 Subject: [PATCH 2/4] ignore doc test --- crates/iceberg/src/writer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 59be5d09bb..e7cc83076f 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -37,7 +37,7 @@ //! //! # Simple Case 1: Create a data file writer using parquet file format. //! # TODO(Implement this example) -//! ``` +//! ```ignore //! // 1. Create a parquet file writer builder. //! let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config); //! // 2. Create a data file writer builder. @@ -52,7 +52,7 @@ //! //! # Complex Case 2: Create a fanout partition data file writer using parquet file format. //! # TODO (Implement this example) -//! ``` +//! ```ignore //! // 1. Create a parquet file writer builder. //! let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config); //! // 2. Create a data file writer builder. From a437a726345342cfa3ff5c60ccd7def03ef5cf5e Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 12 Jan 2024 12:39:21 +0800 Subject: [PATCH 3/4] remove **WriteResult trait --- crates/iceberg/src/writer/file_writer/mod.rs | 16 +++----------- crates/iceberg/src/writer/mod.rs | 23 +++++--------------- 2 files changed, 9 insertions(+), 30 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 70e6c15094..d49fa94e6c 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -17,8 +17,8 @@ //! Iceberg File Writer -use super::{CurrentFileStatus, IcebergWriteResult}; -use crate::Result; +use super::CurrentFileStatus; +use crate::{spec::DataFileBuilder, Result}; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; @@ -34,18 +34,8 @@ pub trait FileWriterBuilder: Send + Clone + 'static { /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) #[async_trait::async_trait] pub trait FileWriter: Send + 'static + CurrentFileStatus { - /// The associated file write result type. - type R: FileWriteResult; /// Write record batch to file. async fn write(&mut self, batch: &RecordBatch) -> Result<()>; /// Close file writer. - async fn close(self) -> Result>; -} - -/// File write result. -pub trait FileWriteResult: Send + 'static { - /// The associated iceberg write result type. - type R: IcebergWriteResult; - /// Convert to iceberg write result. - fn to_iceberg_result(self) -> Self::R; + async fn close(self) -> Result>; } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index e7cc83076f..fc756fb419 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -48,6 +48,8 @@ //! iceberg_writer.write(input).await?; //! //! let write_result = iceberg_writer.flush().await?; +//! +//! let data_file = write_result.into_iter().map(|builder|builder.build()).collect::>(); //! ``` //! //! # Complex Case 2: Create a fanout partition data file writer using parquet file format. @@ -65,12 +67,11 @@ //! iceberg_writer.write(input).await?; //! //! let write_result = iceberg_writer.flush().await?; +//! +//! let data_file = write_result.into_iter().map(|builder|builder.build()).collect::>(); //! ``` -use crate::{ - spec::{DataContentType, Struct}, - Result, -}; +use crate::{spec::DataFileBuilder, Result}; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; @@ -90,22 +91,10 @@ pub trait IcebergWriterBuilder: Send + Clone + 'static { /// The iceberg writer used to write data to iceberg table. #[async_trait::async_trait] pub trait IcebergWriter: Send + 'static { - /// The associated write result type. - type R: IcebergWriteResult; /// Write data to iceberg table. async fn write(&mut self, input: I) -> Result<()>; /// Flush the writer and return the write result. - async fn flush(&mut self) -> Result>; -} - -/// The write result of iceberg writer. -pub trait IcebergWriteResult: Send + Sync + 'static { - /// Set the content type of the write result. - fn set_content(&mut self, content: DataContentType) -> &mut Self; - /// Set the equality ids of the write result. - fn set_equality_ids(&mut self, equality_ids: Vec) -> &mut Self; - /// Set the partition of the write result. - fn set_partition(&mut self, partition_value: Struct) -> &mut Self; + async fn flush(&mut self) -> Result>; } /// The current file status of iceberg writer. It implement for the writer which write a single From bb07a58336bfd63072ce24928f649e09d3b74958 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 12 Jan 2024 14:53:12 +0800 Subject: [PATCH 4/4] * add generic param for output result * remove async trait --- crates/iceberg/src/writer/file_writer/mod.rs | 16 ++++++++-------- crates/iceberg/src/writer/mod.rs | 15 +++++++++------ 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index d49fa94e6c..888eb8c473 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -17,25 +17,25 @@ //! Iceberg File Writer -use super::CurrentFileStatus; -use crate::{spec::DataFileBuilder, Result}; +use super::{CurrentFileStatus, DefaultOutput}; +use crate::Result; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; /// File writer builder trait. -#[async_trait::async_trait] -pub trait FileWriterBuilder: Send + Clone + 'static { +#[allow(async_fn_in_trait)] +pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. - type R: FileWriter; + type R: FileWriter; /// Build file writer. async fn build(self, schema: &SchemaRef) -> Result; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) -#[async_trait::async_trait] -pub trait FileWriter: Send + 'static + CurrentFileStatus { +#[allow(async_fn_in_trait)] +pub trait FileWriter: Send + 'static + CurrentFileStatus { /// Write record batch to file. async fn write(&mut self, batch: &RecordBatch) -> Result<()>; /// Close file writer. - async fn close(self) -> Result>; + async fn close(self) -> Result; } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index fc756fb419..0ec8740863 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -78,23 +78,26 @@ use arrow_schema::SchemaRef; pub mod file_writer; type DefaultInput = RecordBatch; +type DefaultOutput = Vec; /// The builder for iceberg writer. -#[async_trait::async_trait] -pub trait IcebergWriterBuilder: Send + Clone + 'static { +#[allow(async_fn_in_trait)] +pub trait IcebergWriterBuilder: + Send + Clone + 'static +{ /// The associated writer type. - type R: IcebergWriter; + type R: IcebergWriter; /// Build the iceberg writer. async fn build(self, schema: &SchemaRef) -> Result; } /// The iceberg writer used to write data to iceberg table. -#[async_trait::async_trait] -pub trait IcebergWriter: Send + 'static { +#[allow(async_fn_in_trait)] +pub trait IcebergWriter: Send + 'static { /// Write data to iceberg table. async fn write(&mut self, input: I) -> Result<()>; /// Flush the writer and return the write result. - async fn flush(&mut self) -> Result>; + async fn flush(&mut self) -> Result; } /// The current file status of iceberg writer. It implement for the writer which write a single