Skip to content

Commit 2cae254

Browse files
committed
init iceberg writer
1 parent 6e29ca7 commit 2cae254

File tree

6 files changed

+490
-104
lines changed

6 files changed

+490
-104
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ array-init = "2"
4040
arrow-arith = { version = ">=46" }
4141
arrow-array = { version = ">=46" }
4242
arrow-schema = { version = ">=46" }
43+
arrow-select = { version = ">=46" }
4344
async-stream = "0.3.5"
4445
async-trait = "0.1"
4546
bimap = "0.6"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ array-init = { workspace = true }
3535
arrow-arith = { workspace = true }
3636
arrow-array = { workspace = true }
3737
arrow-schema = { workspace = true }
38+
arrow-select = { workspace = true }
3839
async-stream = { workspace = true }
3940
async-trait = { workspace = true }
4041
bimap = { workspace = true }
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This module provide `DataFileWriter`.
19+
20+
use crate::spec::{DataContentType, DataFileBuilder};
21+
use crate::writer::file_writer::FileWriter;
22+
use crate::writer::CurrentFileStatus;
23+
use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder};
24+
use crate::Result;
25+
use arrow_array::RecordBatch;
26+
use itertools::Itertools;
27+
28+
/// Builder for `DataFileWriter`.
29+
#[derive(Clone)]
30+
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
31+
inner: B,
32+
}
33+
34+
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
35+
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
36+
pub fn new(inner: B) -> Self {
37+
Self { inner }
38+
}
39+
}
40+
41+
#[allow(async_fn_in_trait)]
42+
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
43+
type R = DataFileWriter<B>;
44+
45+
async fn build(self) -> Result<Self::R> {
46+
Ok(DataFileWriter {
47+
inner_writer: self.inner.clone().build().await?,
48+
builder: self.inner,
49+
})
50+
}
51+
}
52+
53+
/// A writer write data is within one spec/partition.
54+
pub struct DataFileWriter<B: FileWriterBuilder> {
55+
builder: B,
56+
inner_writer: B::R,
57+
}
58+
59+
#[async_trait::async_trait]
60+
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
61+
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
62+
self.inner_writer.write(&batch).await
63+
}
64+
65+
async fn flush(&mut self) -> Result<Vec<DataFileBuilder>> {
66+
let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?);
67+
let res = writer
68+
.close()
69+
.await?
70+
.into_iter()
71+
.map(|mut res| {
72+
res.content(DataContentType::Data);
73+
res
74+
})
75+
.collect_vec();
76+
Ok(res)
77+
}
78+
}
79+
80+
impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
81+
fn current_file_path(&self) -> String {
82+
self.inner_writer.current_file_path()
83+
}
84+
85+
fn current_row_num(&self) -> usize {
86+
self.inner_writer.current_row_num()
87+
}
88+
89+
fn current_written_size(&self) -> usize {
90+
self.inner_writer.current_written_size()
91+
}
92+
}
93+
94+
#[cfg(test)]
95+
mod test {
96+
use std::{collections::HashMap, sync::Arc};
97+
98+
use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
99+
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
100+
use tempfile::TempDir;
101+
102+
use crate::{
103+
io::FileIOBuilder,
104+
spec::{DataFileFormat, Struct},
105+
writer::{
106+
base_writer::data_file_writer::DataFileWriterBuilder,
107+
file_writer::{
108+
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
109+
ParquetWriterBuilder,
110+
},
111+
tests::check_parquet_data_file,
112+
IcebergWriter, IcebergWriterBuilder,
113+
},
114+
};
115+
116+
#[tokio::test]
117+
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
118+
let temp_dir = TempDir::new().unwrap();
119+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
120+
let location_gen =
121+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
122+
let file_name_gen =
123+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
124+
125+
// prepare data
126+
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
127+
let schema = {
128+
let fields = vec![
129+
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
130+
.with_metadata(HashMap::from([(
131+
PARQUET_FIELD_ID_META_KEY.to_string(),
132+
"0".to_string(),
133+
)])),
134+
arrow_schema::Field::new(
135+
"col1",
136+
arrow_schema::DataType::Struct(
137+
vec![arrow_schema::Field::new(
138+
"sub_col",
139+
arrow_schema::DataType::Int64,
140+
true,
141+
)
142+
.with_metadata(HashMap::from([(
143+
PARQUET_FIELD_ID_META_KEY.to_string(),
144+
"-1".to_string(),
145+
)]))]
146+
.into(),
147+
),
148+
true,
149+
)
150+
.with_metadata(HashMap::from([(
151+
PARQUET_FIELD_ID_META_KEY.to_string(),
152+
"1".to_string(),
153+
)])),
154+
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
155+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
156+
),
157+
arrow_schema::Field::new(
158+
"col3",
159+
arrow_schema::DataType::List(Arc::new(
160+
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
161+
.with_metadata(HashMap::from([(
162+
PARQUET_FIELD_ID_META_KEY.to_string(),
163+
"-1".to_string(),
164+
)])),
165+
)),
166+
true,
167+
)
168+
.with_metadata(HashMap::from([(
169+
PARQUET_FIELD_ID_META_KEY.to_string(),
170+
"3".to_string(),
171+
)])),
172+
arrow_schema::Field::new(
173+
"col4",
174+
arrow_schema::DataType::Struct(
175+
vec![arrow_schema::Field::new(
176+
"sub_col",
177+
arrow_schema::DataType::Struct(
178+
vec![arrow_schema::Field::new(
179+
"sub_sub_col",
180+
arrow_schema::DataType::Int64,
181+
true,
182+
)
183+
.with_metadata(HashMap::from([(
184+
PARQUET_FIELD_ID_META_KEY.to_string(),
185+
"-1".to_string(),
186+
)]))]
187+
.into(),
188+
),
189+
true,
190+
)
191+
.with_metadata(HashMap::from([(
192+
PARQUET_FIELD_ID_META_KEY.to_string(),
193+
"-1".to_string(),
194+
)]))]
195+
.into(),
196+
),
197+
true,
198+
)
199+
.with_metadata(HashMap::from([(
200+
PARQUET_FIELD_ID_META_KEY.to_string(),
201+
"4".to_string(),
202+
)])),
203+
];
204+
Arc::new(arrow_schema::Schema::new(fields))
205+
};
206+
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
207+
let col1 = Arc::new(StructArray::new(
208+
vec![
209+
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
210+
.with_metadata(HashMap::from([(
211+
PARQUET_FIELD_ID_META_KEY.to_string(),
212+
"-1".to_string(),
213+
)])),
214+
]
215+
.into(),
216+
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
217+
None,
218+
));
219+
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
220+
"test";
221+
1024
222+
])) as ArrayRef;
223+
let col3 = Arc::new({
224+
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
225+
Some(
226+
vec![Some(1),]
227+
);
228+
1024
229+
])
230+
.into_parts();
231+
arrow_array::ListArray::new(
232+
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
233+
PARQUET_FIELD_ID_META_KEY.to_string(),
234+
"-1".to_string(),
235+
)]))),
236+
list_parts.1,
237+
list_parts.2,
238+
list_parts.3,
239+
)
240+
}) as ArrayRef;
241+
let col4 = Arc::new(StructArray::new(
242+
vec![arrow_schema::Field::new(
243+
"sub_col",
244+
arrow_schema::DataType::Struct(
245+
vec![arrow_schema::Field::new(
246+
"sub_sub_col",
247+
arrow_schema::DataType::Int64,
248+
true,
249+
)
250+
.with_metadata(HashMap::from([(
251+
PARQUET_FIELD_ID_META_KEY.to_string(),
252+
"-1".to_string(),
253+
)]))]
254+
.into(),
255+
),
256+
true,
257+
)
258+
.with_metadata(HashMap::from([(
259+
PARQUET_FIELD_ID_META_KEY.to_string(),
260+
"-1".to_string(),
261+
)]))]
262+
.into(),
263+
vec![Arc::new(StructArray::new(
264+
vec![
265+
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
266+
.with_metadata(HashMap::from([(
267+
PARQUET_FIELD_ID_META_KEY.to_string(),
268+
"-1".to_string(),
269+
)])),
270+
]
271+
.into(),
272+
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
273+
None,
274+
))],
275+
None,
276+
));
277+
let to_write =
278+
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();
279+
280+
// prepare writer
281+
let pb = ParquetWriterBuilder::new(
282+
0,
283+
WriterProperties::builder().build(),
284+
to_write.schema(),
285+
file_io.clone(),
286+
location_gen,
287+
file_name_gen,
288+
);
289+
let mut data_file_writer = DataFileWriterBuilder::new(pb).build().await?;
290+
291+
for _ in 0..3 {
292+
// write
293+
data_file_writer.write(to_write.clone()).await?;
294+
let res = data_file_writer.flush().await?;
295+
assert_eq!(res.len(), 1);
296+
let data_file = res
297+
.into_iter()
298+
.next()
299+
.unwrap()
300+
.partition(Struct::empty())
301+
.build()
302+
.unwrap();
303+
304+
// check
305+
check_parquet_data_file(&file_io, &data_file, &to_write).await;
306+
}
307+
308+
Ok(())
309+
}
310+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
19+
20+
pub mod data_file_writer;

0 commit comments

Comments
 (0)