Skip to content

Commit b724fdc

Browse files
committed
init writer framework
1 parent b703146 commit b724fdc

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

crates/iceberg/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,5 @@ pub mod spec;
4949
pub mod expr;
5050
pub mod transaction;
5151
pub mod transform;
52+
53+
pub mod writer;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg File Writer
19+
20+
use super::{CurrentFileStatus, IcebergWriteResult};
21+
use crate::Result;
22+
use arrow_array::RecordBatch;
23+
use arrow_schema::SchemaRef;
24+
25+
/// File writer builder trait.
26+
#[async_trait::async_trait]
27+
pub trait FileWriterBuilder: Send + Clone + 'static {
28+
/// The associated file writer type.
29+
type R: FileWriter;
30+
/// Build file writer.
31+
async fn build(self, schema: &SchemaRef) -> Result<Self::R>;
32+
}
33+
34+
/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
35+
#[async_trait::async_trait]
36+
pub trait FileWriter: Send + 'static + CurrentFileStatus {
37+
/// The associated file write result type.
38+
type R: FileWriteResult;
39+
/// Write record batch to file.
40+
async fn write(&mut self, batch: &RecordBatch) -> Result<()>;
41+
/// Close file writer.
42+
async fn close(self) -> Result<Vec<Self::R>>;
43+
}
44+
45+
/// File write result.
46+
pub trait FileWriteResult: Send + 'static {
47+
/// The associated iceberg write result type.
48+
type R: IcebergWriteResult;
49+
/// Convert to iceberg write result.
50+
fn to_iceberg_result(self) -> Self::R;
51+
}

crates/iceberg/src/writer/mod.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg writer module.
19+
//!
20+
//! The writer API is designed to be extensible and flexible. Each writer is decoupled and can be create and config independently. User can:
21+
//! 1.Combine different writer builder to build a writer which have complex write logic. Such as FanoutPartition + DataFileWrite or FanoutPartition + PosititionDeleteFileWrite.
22+
//! 2.Customize the writer and combine it with original writer builder to build a writer which
23+
//! can process the data in a specific way.
24+
//!
25+
//! There are two kinds of writer and related builder:
26+
//! 1. `IcebergWriter` and `IcebergWriterBuilder`, they are focus on the data process logical.
27+
//! If you want to support a new data process logical, you need to implement a new `IcebergWriter` and `IcebergWriterBuilder`.
28+
//! 2. `FileWriter` and `FileWriterBuilder`, they are focus on the physical file write.
29+
//! If you want to support a new physical file format, you need to implement a new `FileWriter` and `FileWriterBuilder`.
30+
//!
31+
//! The create process of iceberg writer is:
32+
//! 1. Create a `FileWriterBuilder`.
33+
//! 1a. Combine it with other `FileWriterBuilder` to get a new `FileWriterBuilder`.
34+
//! 2. Use FileWriterBuilder to create a `IcebergWriterBuilder`.
35+
//! 2a. Combine it with other `IcebergWriterBuilder` to get a new `IcebergWriterBuilder`.
36+
//! 3. Use `build` function in `IcebergWriterBuilder` to create a `IcebergWriter`.
37+
//!
38+
//! # Simple Case 1: Create a data file writer using parquet file format.
39+
//! # TODO(Implement this example)
40+
//! ```
41+
//! // 1. Create a parquet file writer builder.
42+
//! let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config);
43+
//! // 2. Create a data file writer builder.
44+
//! let DataFileWriterBuilder = DataFileWriterBuilder::new(parquet_writer_builder,data_file_writer_config);
45+
//! // 3. Create a iceberg writer.
46+
//! let iceberg_writer = DataFileWriterBuilder.build(schema).await?;
47+
//!
48+
//! iceberg_writer.write(input).await?;
49+
//!
50+
//! let write_result = iceberg_writer.flush().await?;
51+
//! ```
52+
//!
53+
//! # Complex Case 2: Create a fanout partition data file writer using parquet file format.
54+
//! # TODO (Implement this example)
55+
//! ```
56+
//! // 1. Create a parquet file writer builder.
57+
//! let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config);
58+
//! // 2. Create a data file writer builder.
59+
//! let DataFileWriterBuilder = DataFileWriterBuilder::new(parquet_writer_builder,data_file_writer_config);
60+
//! // 3. Create a fanout partition writer builder.
61+
//! let fanout_partition_writer_builder = FanoutPartitionWriterBuilder::new(DataFileWriterBuilder, partition_config);
62+
//! // 4. Create a iceberg writer.
63+
//! let iceberg_writer = fanout_partition_writer_builder.build(schema).await?;
64+
//!
65+
//! iceberg_writer.write(input).await?;
66+
//!
67+
//! let write_result = iceberg_writer.flush().await?;
68+
//! ```
69+
70+
use crate::{
71+
spec::{DataContentType, Struct},
72+
Result,
73+
};
74+
use arrow_array::RecordBatch;
75+
use arrow_schema::SchemaRef;
76+
77+
pub mod file_writer;
78+
79+
type DefaultInput = RecordBatch;
80+
81+
/// The builder for iceberg writer.
82+
#[async_trait::async_trait]
83+
pub trait IcebergWriterBuilder<I = DefaultInput>: Send + Clone + 'static {
84+
/// The associated writer type.
85+
type R: IcebergWriter<I>;
86+
/// Build the iceberg writer.
87+
async fn build(self, schema: &SchemaRef) -> Result<Self::R>;
88+
}
89+
90+
/// The iceberg writer used to write data to iceberg table.
91+
#[async_trait::async_trait]
92+
pub trait IcebergWriter<I = DefaultInput>: Send + 'static {
93+
/// The associated write result type.
94+
type R: IcebergWriteResult;
95+
/// Write data to iceberg table.
96+
async fn write(&mut self, input: I) -> Result<()>;
97+
/// Flush the writer and return the write result.
98+
async fn flush(&mut self) -> Result<Vec<Self::R>>;
99+
}
100+
101+
/// The write result of iceberg writer.
102+
pub trait IcebergWriteResult: Send + Sync + 'static {
103+
/// Set the content type of the write result.
104+
fn set_content(&mut self, content: DataContentType) -> &mut Self;
105+
/// Set the equality ids of the write result.
106+
fn set_equality_ids(&mut self, equality_ids: Vec<i32>) -> &mut Self;
107+
/// Set the partition of the write result.
108+
fn set_partition(&mut self, partition_value: Struct) -> &mut Self;
109+
}
110+
111+
/// The current file status of iceberg writer. It implement for the writer which write a single
112+
/// file.
113+
pub trait CurrentFileStatus {
114+
/// Get the current file path.
115+
fn current_file_path(&self) -> String;
116+
/// Get the current file row number.
117+
fn current_row_num(&self) -> usize;
118+
/// Get the current file written size.
119+
fn current_written_size(&self) -> usize;
120+
}

0 commit comments

Comments
 (0)