Skip to content

Commit e8bfe70

Browse files
ZENOTMEshaeqahmed
authored andcommitted
feat: init iceberg writer (apache#275)
* init iceberg writer * refine * refine the interface --------- Co-authored-by: ZENOTME <[email protected]>
1 parent a734661 commit e8bfe70

File tree

7 files changed

+524
-116
lines changed

7 files changed

+524
-116
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ array-init = "2"
4040
arrow-arith = { version = "51" }
4141
arrow-array = { version = "51" }
4242
arrow-schema = { version = "51" }
43+
arrow-select = { version = "51" }
4344
async-stream = "0.3.5"
4445
async-trait = "0.1"
4546
aws-config = "1.1.8"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ array-init = { workspace = true }
3535
arrow-arith = { workspace = true }
3636
arrow-array = { workspace = true }
3737
arrow-schema = { workspace = true }
38+
arrow-select = { workspace = true }
3839
async-stream = { workspace = true }
3940
async-trait = { workspace = true }
4041
bimap = { workspace = true }
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This module provide `DataFileWriter`.
19+
20+
use crate::spec::{DataContentType, DataFile, Struct};
21+
use crate::writer::file_writer::FileWriter;
22+
use crate::writer::CurrentFileStatus;
23+
use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder};
24+
use crate::Result;
25+
use arrow_array::RecordBatch;
26+
use itertools::Itertools;
27+
28+
/// Builder for `DataFileWriter`.
29+
#[derive(Clone)]
30+
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
31+
inner: B,
32+
}
33+
34+
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
35+
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
36+
pub fn new(inner: B) -> Self {
37+
Self { inner }
38+
}
39+
}
40+
41+
/// Config for `DataFileWriter`.
42+
pub struct DataFileWriterConfig {
43+
partition_value: Struct,
44+
}
45+
46+
impl DataFileWriterConfig {
47+
/// Create a new `DataFileWriterConfig` with partition value.
48+
pub fn new(partition_value: Option<Struct>) -> Self {
49+
Self {
50+
partition_value: partition_value.unwrap_or(Struct::empty()),
51+
}
52+
}
53+
}
54+
55+
#[async_trait::async_trait]
56+
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
57+
type R = DataFileWriter<B>;
58+
type C = DataFileWriterConfig;
59+
60+
async fn build(self, config: Self::C) -> Result<Self::R> {
61+
Ok(DataFileWriter {
62+
inner_writer: Some(self.inner.clone().build().await?),
63+
partition_value: config.partition_value,
64+
})
65+
}
66+
}
67+
68+
/// A writer write data is within one spec/partition.
69+
pub struct DataFileWriter<B: FileWriterBuilder> {
70+
inner_writer: Option<B::R>,
71+
partition_value: Struct,
72+
}
73+
74+
#[async_trait::async_trait]
75+
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
76+
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
77+
self.inner_writer.as_mut().unwrap().write(&batch).await
78+
}
79+
80+
async fn close(&mut self) -> Result<Vec<DataFile>> {
81+
let writer = self.inner_writer.take().unwrap();
82+
Ok(writer
83+
.close()
84+
.await?
85+
.into_iter()
86+
.map(|mut res| {
87+
res.content(DataContentType::Data);
88+
res.partition(self.partition_value.clone());
89+
res.build().expect("Guaranteed to be valid")
90+
})
91+
.collect_vec())
92+
}
93+
}
94+
95+
impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
96+
fn current_file_path(&self) -> String {
97+
self.inner_writer.as_ref().unwrap().current_file_path()
98+
}
99+
100+
fn current_row_num(&self) -> usize {
101+
self.inner_writer.as_ref().unwrap().current_row_num()
102+
}
103+
104+
fn current_written_size(&self) -> usize {
105+
self.inner_writer.as_ref().unwrap().current_written_size()
106+
}
107+
}
108+
109+
#[cfg(test)]
110+
mod test {
111+
use std::{collections::HashMap, sync::Arc};
112+
113+
use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
114+
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
115+
use tempfile::TempDir;
116+
117+
use crate::{
118+
io::FileIOBuilder,
119+
spec::DataFileFormat,
120+
writer::{
121+
base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig},
122+
file_writer::{
123+
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
124+
ParquetWriterBuilder,
125+
},
126+
tests::check_parquet_data_file,
127+
IcebergWriter, IcebergWriterBuilder,
128+
},
129+
};
130+
131+
#[tokio::test]
132+
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
133+
let temp_dir = TempDir::new().unwrap();
134+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
135+
let location_gen =
136+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
137+
let file_name_gen =
138+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
139+
140+
// prepare data
141+
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
142+
let schema = {
143+
let fields = vec![
144+
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
145+
.with_metadata(HashMap::from([(
146+
PARQUET_FIELD_ID_META_KEY.to_string(),
147+
"0".to_string(),
148+
)])),
149+
arrow_schema::Field::new(
150+
"col1",
151+
arrow_schema::DataType::Struct(
152+
vec![arrow_schema::Field::new(
153+
"sub_col",
154+
arrow_schema::DataType::Int64,
155+
true,
156+
)
157+
.with_metadata(HashMap::from([(
158+
PARQUET_FIELD_ID_META_KEY.to_string(),
159+
"5".to_string(),
160+
)]))]
161+
.into(),
162+
),
163+
true,
164+
)
165+
.with_metadata(HashMap::from([(
166+
PARQUET_FIELD_ID_META_KEY.to_string(),
167+
"1".to_string(),
168+
)])),
169+
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
170+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
171+
),
172+
arrow_schema::Field::new(
173+
"col3",
174+
arrow_schema::DataType::List(Arc::new(
175+
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
176+
.with_metadata(HashMap::from([(
177+
PARQUET_FIELD_ID_META_KEY.to_string(),
178+
"6".to_string(),
179+
)])),
180+
)),
181+
true,
182+
)
183+
.with_metadata(HashMap::from([(
184+
PARQUET_FIELD_ID_META_KEY.to_string(),
185+
"3".to_string(),
186+
)])),
187+
arrow_schema::Field::new(
188+
"col4",
189+
arrow_schema::DataType::Struct(
190+
vec![arrow_schema::Field::new(
191+
"sub_col",
192+
arrow_schema::DataType::Struct(
193+
vec![arrow_schema::Field::new(
194+
"sub_sub_col",
195+
arrow_schema::DataType::Int64,
196+
true,
197+
)
198+
.with_metadata(HashMap::from([(
199+
PARQUET_FIELD_ID_META_KEY.to_string(),
200+
"7".to_string(),
201+
)]))]
202+
.into(),
203+
),
204+
true,
205+
)
206+
.with_metadata(HashMap::from([(
207+
PARQUET_FIELD_ID_META_KEY.to_string(),
208+
"8".to_string(),
209+
)]))]
210+
.into(),
211+
),
212+
true,
213+
)
214+
.with_metadata(HashMap::from([(
215+
PARQUET_FIELD_ID_META_KEY.to_string(),
216+
"4".to_string(),
217+
)])),
218+
];
219+
Arc::new(arrow_schema::Schema::new(fields))
220+
};
221+
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
222+
let col1 = Arc::new(StructArray::new(
223+
vec![
224+
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
225+
.with_metadata(HashMap::from([(
226+
PARQUET_FIELD_ID_META_KEY.to_string(),
227+
"5".to_string(),
228+
)])),
229+
]
230+
.into(),
231+
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
232+
None,
233+
));
234+
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
235+
"test";
236+
1024
237+
])) as ArrayRef;
238+
let col3 = Arc::new({
239+
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
240+
Some(
241+
vec![Some(1),]
242+
);
243+
1024
244+
])
245+
.into_parts();
246+
arrow_array::ListArray::new(
247+
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
248+
PARQUET_FIELD_ID_META_KEY.to_string(),
249+
"6".to_string(),
250+
)]))),
251+
list_parts.1,
252+
list_parts.2,
253+
list_parts.3,
254+
)
255+
}) as ArrayRef;
256+
let col4 = Arc::new(StructArray::new(
257+
vec![arrow_schema::Field::new(
258+
"sub_col",
259+
arrow_schema::DataType::Struct(
260+
vec![arrow_schema::Field::new(
261+
"sub_sub_col",
262+
arrow_schema::DataType::Int64,
263+
true,
264+
)
265+
.with_metadata(HashMap::from([(
266+
PARQUET_FIELD_ID_META_KEY.to_string(),
267+
"7".to_string(),
268+
)]))]
269+
.into(),
270+
),
271+
true,
272+
)
273+
.with_metadata(HashMap::from([(
274+
PARQUET_FIELD_ID_META_KEY.to_string(),
275+
"8".to_string(),
276+
)]))]
277+
.into(),
278+
vec![Arc::new(StructArray::new(
279+
vec![
280+
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
281+
.with_metadata(HashMap::from([(
282+
PARQUET_FIELD_ID_META_KEY.to_string(),
283+
"7".to_string(),
284+
)])),
285+
]
286+
.into(),
287+
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
288+
None,
289+
))],
290+
None,
291+
));
292+
let to_write =
293+
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();
294+
295+
// prepare writer
296+
let pb = ParquetWriterBuilder::new(
297+
WriterProperties::builder().build(),
298+
to_write.schema(),
299+
file_io.clone(),
300+
location_gen,
301+
file_name_gen,
302+
);
303+
let mut data_file_writer = DataFileWriterBuilder::new(pb)
304+
.build(DataFileWriterConfig::new(None))
305+
.await?;
306+
307+
// write
308+
data_file_writer.write(to_write.clone()).await?;
309+
let res = data_file_writer.close().await?;
310+
assert_eq!(res.len(), 1);
311+
let data_file = res.into_iter().next().unwrap();
312+
313+
// check
314+
check_parquet_data_file(&file_io, &data_file, &to_write).await;
315+
316+
Ok(())
317+
}
318+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
19+
20+
pub mod data_file_writer;

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! This module contains the writer for data file format supported by iceberg: parquet, orc.
1919
20-
use super::{CurrentFileStatus, DefaultOutput};
21-
use crate::Result;
20+
use super::CurrentFileStatus;
21+
use crate::{spec::DataFileBuilder, Result};
2222
use arrow_array::RecordBatch;
2323
use futures::Future;
2424

@@ -28,6 +28,8 @@ mod track_writer;
2828

2929
pub mod location_generator;
3030

31+
type DefaultOutput = Vec<DataFileBuilder>;
32+
3133
/// File writer builder trait.
3234
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
3335
/// The associated file writer type.

0 commit comments

Comments
 (0)