Skip to content

Commit 1138147

Browse files
committed
add parquet writer
1 parent d9d6cfc commit 1138147

File tree

8 files changed

+471
-31
lines changed

8 files changed

+471
-31
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ arrow-schema = { version = ">=46" }
3636
async-trait = "0.1"
3737
bimap = "0.6"
3838
bitvec = "1.0.1"
39+
bytes = "1.5"
3940
chrono = "0.4"
4041
derive_builder = "0.12.0"
4142
either = "1"
@@ -53,6 +54,7 @@ opendal = "0.44"
5354
ordered-float = "4.0.0"
5455
pretty_assertions = "1.4.0"
5556
port_scanner = "0.1.5"
57+
parquet = { version = ">=46", features = ["async"] }
5658
reqwest = { version = "^0.11", features = ["json"] }
5759
rust_decimal = "1.31.0"
5860
serde = { version = "^1.0", features = ["rc"] }

crates/iceberg/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ arrow-schema = { workspace = true }
3636
async-trait = { workspace = true }
3737
bimap = { workspace = true }
3838
bitvec = { workspace = true }
39+
bytes ={ workspace = true }
3940
chrono = { workspace = true }
4041
derive_builder = { workspace = true }
4142
either = { workspace = true }
@@ -47,6 +48,7 @@ murmur3 = { workspace = true }
4748
once_cell = { workspace = true }
4849
opendal = { workspace = true }
4950
ordered-float = { workspace = true }
51+
parquet ={ workspace = true }
5052
reqwest = { workspace = true }
5153
rust_decimal = { workspace = true }
5254
serde = { workspace = true }
@@ -55,6 +57,7 @@ serde_derive = { workspace = true }
5557
serde_json = { workspace = true }
5658
serde_repr = { workspace = true }
5759
serde_with = { workspace = true }
60+
tokio = { workspace = true }
5861
typed-builder = { workspace = true }
5962
url = { workspace = true }
6063
urlencoding = { workspace = true }
@@ -64,4 +67,3 @@ uuid = { workspace = true }
6467
pretty_assertions = { workspace = true }
6568
tempfile = { workspace = true }
6669
tera = { workspace = true }
67-
tokio = { workspace = true }

crates/iceberg/src/io.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
5454
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
5555
use once_cell::sync::Lazy;
5656
use opendal::{Operator, Scheme};
57+
use tokio::io::AsyncWrite as TokioAsyncWrite;
5758
use url::Url;
5859

5960
/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
@@ -240,9 +241,9 @@ impl InputFile {
240241
}
241242

242243
/// Trait for writing file.
243-
pub trait FileWrite: AsyncWrite {}
244+
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
244245

245-
impl<T> FileWrite for T where T: AsyncWrite {}
246+
impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
246247

247248
/// Output file is used for writing to files..
248249
#[derive(Debug)]
@@ -278,8 +279,10 @@ impl OutputFile {
278279
}
279280

280281
/// Creates output file for writing.
281-
pub async fn writer(&self) -> Result<impl FileWrite> {
282-
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
282+
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
283+
Ok(Box::new(
284+
self.op.writer(&self.path[self.relative_path_pos..]).await?,
285+
))
283286
}
284287
}
285288

crates/iceberg/src/scan.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl FileScanTask {
188188
mod tests {
189189
use crate::io::{FileIO, OutputFile};
190190
use crate::spec::{
191-
DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest,
191+
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
192192
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
193193
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
194194
};
@@ -351,14 +351,15 @@ mod tests {
351351
ManifestEntry::builder()
352352
.status(ManifestStatus::Added)
353353
.data_file(
354-
DataFile::builder()
354+
DataFileBuilder::default()
355355
.content(DataContentType::Data)
356356
.file_path(format!("{}/1.parquet", &fixture.table_location))
357357
.file_format(DataFileFormat::Parquet)
358358
.file_size_in_bytes(100)
359359
.record_count(1)
360360
.partition(Struct::from_iter([Some(Literal::long(100))]))
361-
.build(),
361+
.build()
362+
.unwrap(),
362363
)
363364
.build(),
364365
ManifestEntry::builder()
@@ -367,14 +368,15 @@ mod tests {
367368
.sequence_number(parent_snapshot.sequence_number())
368369
.file_sequence_number(parent_snapshot.sequence_number())
369370
.data_file(
370-
DataFile::builder()
371+
DataFileBuilder::default()
371372
.content(DataContentType::Data)
372373
.file_path(format!("{}/2.parquet", &fixture.table_location))
373374
.file_format(DataFileFormat::Parquet)
374375
.file_size_in_bytes(100)
375376
.record_count(1)
376377
.partition(Struct::from_iter([Some(Literal::long(200))]))
377-
.build(),
378+
.build()
379+
.unwrap(),
378380
)
379381
.build(),
380382
ManifestEntry::builder()
@@ -383,14 +385,15 @@ mod tests {
383385
.sequence_number(parent_snapshot.sequence_number())
384386
.file_sequence_number(parent_snapshot.sequence_number())
385387
.data_file(
386-
DataFile::builder()
388+
DataFileBuilder::default()
387389
.content(DataContentType::Data)
388390
.file_path(format!("{}/3.parquet", &fixture.table_location))
389391
.file_format(DataFileFormat::Parquet)
390392
.file_size_in_bytes(100)
391393
.record_count(1)
392394
.partition(Struct::from_iter([Some(Literal::long(300))]))
393-
.build(),
395+
.build()
396+
.unwrap(),
394397
)
395398
.build(),
396399
],

crates/iceberg/src/spec/manifest.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -924,34 +924,34 @@ impl TryFrom<i32> for ManifestStatus {
924924
}
925925

926926
/// Data file carries data file path, partition tuple, metrics, …
927-
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
927+
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
928928
pub struct DataFile {
929929
/// field id: 134
930930
///
931931
/// Type of content stored by the data file: data, equality deletes,
932932
/// or position deletes (all v1 files are data files)
933-
content: DataContentType,
933+
pub(crate) content: DataContentType,
934934
/// field id: 100
935935
///
936936
/// Full URI for the file with FS scheme
937-
file_path: String,
937+
pub(crate) file_path: String,
938938
/// field id: 101
939939
///
940940
/// String file format name, avro, orc or parquet
941-
file_format: DataFileFormat,
941+
pub(crate) file_format: DataFileFormat,
942942
/// field id: 102
943943
///
944944
/// Partition data tuple, schema based on the partition spec output using
945945
/// partition field ids for the struct field ids
946-
partition: Struct,
946+
pub(crate) partition: Struct,
947947
/// field id: 103
948948
///
949949
/// Number of records in this file
950-
record_count: u64,
950+
pub(crate) record_count: u64,
951951
/// field id: 104
952952
///
953953
/// Total file size in bytes
954-
file_size_in_bytes: u64,
954+
pub(crate) file_size_in_bytes: u64,
955955
/// field id: 108
956956
/// key field id: 117
957957
/// value field id: 118
@@ -960,29 +960,29 @@ pub struct DataFile {
960960
/// store the column. Does not include bytes necessary to read other
961961
/// columns, like footers. Leave null for row-oriented formats (Avro)
962962
#[builder(default)]
963-
column_sizes: HashMap<i32, u64>,
963+
pub(crate) column_sizes: HashMap<i32, u64>,
964964
/// field id: 109
965965
/// key field id: 119
966966
/// value field id: 120
967967
///
968968
/// Map from column id to number of values in the column (including null
969969
/// and NaN values)
970970
#[builder(default)]
971-
value_counts: HashMap<i32, u64>,
971+
pub(crate) value_counts: HashMap<i32, u64>,
972972
/// field id: 110
973973
/// key field id: 121
974974
/// value field id: 122
975975
///
976976
/// Map from column id to number of null values in the column
977977
#[builder(default)]
978-
null_value_counts: HashMap<i32, u64>,
978+
pub(crate) null_value_counts: HashMap<i32, u64>,
979979
/// field id: 137
980980
/// key field id: 138
981981
/// value field id: 139
982982
///
983983
/// Map from column id to number of NaN values in the column
984984
#[builder(default)]
985-
nan_value_counts: HashMap<i32, u64>,
985+
pub(crate) nan_value_counts: HashMap<i32, u64>,
986986
/// field id: 125
987987
/// key field id: 126
988988
/// value field id: 127
@@ -995,7 +995,7 @@ pub struct DataFile {
995995
///
996996
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
997997
#[builder(default)]
998-
lower_bounds: HashMap<i32, Literal>,
998+
pub(crate) lower_bounds: HashMap<i32, Literal>,
999999
/// field id: 128
10001000
/// key field id: 129
10011001
/// value field id: 130
@@ -1008,19 +1008,19 @@ pub struct DataFile {
10081008
///
10091009
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
10101010
#[builder(default)]
1011-
upper_bounds: HashMap<i32, Literal>,
1011+
pub(crate) upper_bounds: HashMap<i32, Literal>,
10121012
/// field id: 131
10131013
///
10141014
/// Implementation-specific key metadata for encryption
10151015
#[builder(default)]
1016-
key_metadata: Vec<u8>,
1016+
pub(crate) key_metadata: Vec<u8>,
10171017
/// field id: 132
10181018
/// element field id: 133
10191019
///
10201020
/// Split offsets for the data file. For example, all row group offsets
10211021
/// in a Parquet file. Must be sorted ascending
10221022
#[builder(default)]
1023-
split_offsets: Vec<i64>,
1023+
pub(crate) split_offsets: Vec<i64>,
10241024
/// field id: 135
10251025
/// element field id: 136
10261026
///
@@ -1029,7 +1029,7 @@ pub struct DataFile {
10291029
/// otherwise. Fields with ids listed in this column must be present
10301030
/// in the delete file
10311031
#[builder(default)]
1032-
equality_ids: Vec<i32>,
1032+
pub(crate) equality_ids: Vec<i32>,
10331033
/// field id: 140
10341034
///
10351035
/// ID representing sort order for this file.
@@ -1041,7 +1041,7 @@ pub struct DataFile {
10411041
/// order id to null. Readers must ignore sort order id for position
10421042
/// delete files.
10431043
#[builder(default, setter(strip_option))]
1044-
sort_order_id: Option<i32>,
1044+
pub(crate) sort_order_id: Option<i32>,
10451045
}
10461046

10471047
/// Type of content stored by the data file: data, equality deletes, or

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818
//! This module contains the writer for data file format supported by iceberg: parquet, orc.
1919
2020
use super::{CurrentFileStatus, DefaultOutput};
21-
use crate::Result;
21+
use crate::{io::OutputFile, Result};
2222
use arrow_array::RecordBatch;
2323
use futures::Future;
2424

25+
mod parquet_writer;
26+
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
27+
mod track_writer;
28+
2529
/// File writer builder trait.
2630
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
2731
/// The associated file writer type.
2832
type R: FileWriter<O>;
2933
/// Build file writer.
30-
fn build(self) -> impl Future<Output = Result<Self::R>> + Send;
34+
fn build(self, out_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
3135
}
3236

3337
/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)

0 commit comments

Comments
 (0)