From 2db1c802b4310ecb074a75dfe332032af5b3a73d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 2 Feb 2024 17:07:32 +0800 Subject: [PATCH 1/6] add parquet writer --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 - crates/iceberg/src/io.rs | 11 +- crates/iceberg/src/scan.rs | 2 +- crates/iceberg/src/spec/manifest.rs | 34 +- crates/iceberg/src/writer/file_writer/mod.rs | 8 +- .../src/writer/file_writer/parquet_writer.rs | 354 ++++++++++++++++++ .../src/writer/file_writer/track_writer.rs | 72 ++++ 8 files changed, 458 insertions(+), 25 deletions(-) create mode 100644 crates/iceberg/src/writer/file_writer/parquet_writer.rs create mode 100644 crates/iceberg/src/writer/file_writer/track_writer.rs diff --git a/Cargo.toml b/Cargo.toml index 3234bd07a4..9935a607ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ ordered-float = "4.0.0" parquet = "50" pretty_assertions = "1.4.0" port_scanner = "0.1.5" +parquet = { version = ">=46", features = ["async"] } reqwest = { version = "^0.11", features = ["json"] } rust_decimal = "1.31.0" serde = { version = "^1.0", features = ["rc"] } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 181832cff0..c8728743f9 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -70,4 +70,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } -tokio = { workspace = true } diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs index 410d870768..d1bde2a0a6 100644 --- a/crates/iceberg/src/io.rs +++ b/crates/iceberg/src/io.rs @@ -55,6 +55,7 @@ use futures::{AsyncRead, AsyncSeek, AsyncWrite}; use once_cell::sync::Lazy; use opendal::{Operator, Scheme}; use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek}; +use tokio::io::AsyncWrite as TokioAsyncWrite; use url::Url; /// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3). @@ -244,9 +245,9 @@ impl InputFile { } /// Trait for writing file. -pub trait FileWrite: AsyncWrite {} +pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {} -impl FileWrite for T where T: AsyncWrite {} +impl FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {} /// Output file is used for writing to files.. #[derive(Debug)] @@ -282,8 +283,10 @@ impl OutputFile { } /// Creates output file for writing. - pub async fn writer(&self) -> Result { - Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?) + pub async fn writer(&self) -> Result> { + Ok(Box::new( + self.op.writer(&self.path[self.relative_path_pos..]).await?, + )) } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index cca26b61be..3d2cbfae34 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -211,7 +211,7 @@ impl FileScanTask { mod tests { use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest, + DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, }; diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 5a82007493..a5d0fa944e 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -932,34 +932,34 @@ impl TryFrom for ManifestStatus { } /// Data file carries data file path, partition tuple, metrics, … -#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)] +#[derive(Debug, PartialEq, Clone, Eq, Builder)] pub struct DataFile { /// field id: 134 /// /// Type of content stored by the data file: data, equality deletes, /// or position deletes (all v1 files are data files) - content: DataContentType, + pub(crate) content: DataContentType, /// field id: 100 /// /// Full URI for the file with FS scheme - file_path: String, + pub(crate) file_path: String, /// field id: 101 /// /// String file format name, avro, orc or parquet - file_format: DataFileFormat, + pub(crate) file_format: DataFileFormat, /// field id: 102 /// /// Partition data tuple, schema based on the partition spec output using /// partition field ids for the struct field ids - partition: Struct, + pub(crate) partition: Struct, /// field id: 103 /// /// Number of records in this file - record_count: u64, + pub(crate) record_count: u64, /// field id: 104 /// /// Total file size in bytes - file_size_in_bytes: u64, + pub(crate) file_size_in_bytes: u64, /// field id: 108 /// key field id: 117 /// value field id: 118 @@ -968,7 +968,7 @@ pub struct DataFile { /// store the column. Does not include bytes necessary to read other /// columns, like footers. Leave null for row-oriented formats (Avro) #[builder(default)] - column_sizes: HashMap, + pub(crate) column_sizes: HashMap, /// field id: 109 /// key field id: 119 /// value field id: 120 @@ -976,21 +976,21 @@ pub struct DataFile { /// Map from column id to number of values in the column (including null /// and NaN values) #[builder(default)] - value_counts: HashMap, + pub(crate) value_counts: HashMap, /// field id: 110 /// key field id: 121 /// value field id: 122 /// /// Map from column id to number of null values in the column #[builder(default)] - null_value_counts: HashMap, + pub(crate) null_value_counts: HashMap, /// field id: 137 /// key field id: 138 /// value field id: 139 /// /// Map from column id to number of NaN values in the column #[builder(default)] - nan_value_counts: HashMap, + pub(crate) nan_value_counts: HashMap, /// field id: 125 /// key field id: 126 /// value field id: 127 @@ -1003,7 +1003,7 @@ pub struct DataFile { /// /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) #[builder(default)] - lower_bounds: HashMap, + pub(crate) lower_bounds: HashMap, /// field id: 128 /// key field id: 129 /// value field id: 130 @@ -1016,19 +1016,19 @@ pub struct DataFile { /// /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) #[builder(default)] - upper_bounds: HashMap, + pub(crate) upper_bounds: HashMap, /// field id: 131 /// /// Implementation-specific key metadata for encryption #[builder(default)] - key_metadata: Vec, + pub(crate) key_metadata: Vec, /// field id: 132 /// element field id: 133 /// /// Split offsets for the data file. For example, all row group offsets /// in a Parquet file. Must be sorted ascending #[builder(default)] - split_offsets: Vec, + pub(crate) split_offsets: Vec, /// field id: 135 /// element field id: 136 /// @@ -1037,7 +1037,7 @@ pub struct DataFile { /// otherwise. Fields with ids listed in this column must be present /// in the delete file #[builder(default)] - equality_ids: Vec, + pub(crate) equality_ids: Vec, /// field id: 140 /// /// ID representing sort order for this file. @@ -1049,7 +1049,7 @@ pub struct DataFile { /// order id to null. Readers must ignore sort order id for position /// delete files. #[builder(default, setter(strip_option))] - sort_order_id: Option, + pub(crate) sort_order_id: Option, } /// Type of content stored by the data file: data, equality deletes, or diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index c8251fde77..f6d75c3ede 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -18,16 +18,20 @@ //! This module contains the writer for data file format supported by iceberg: parquet, orc. use super::{CurrentFileStatus, DefaultOutput}; -use crate::Result; +use crate::{io::OutputFile, Result}; use arrow_array::RecordBatch; use futures::Future; +mod parquet_writer; +pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; +mod track_writer; + /// File writer builder trait. pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. - fn build(self) -> impl Future> + Send; + fn build(self, out_file: OutputFile) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs new file mode 100644 index 0000000000..1556c76e19 --- /dev/null +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The module contains the file writer for parquet file format. + +use std::{ + cmp::max, + collections::HashMap, + sync::{atomic::AtomicI64, Arc}, +}; + +use crate::Result; +use crate::{ + io::OutputFile, + spec::{DataFileBuilder, DataFileFormat}, + writer::CurrentFileStatus, + Error, +}; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use parquet::{arrow::AsyncArrowWriter, format::FileMetaData}; +use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties}; + +use super::{track_writer::TrackWriter, FileWriter, FileWriterBuilder}; + +/// ParquetWriterBuilder is used to builder a [`ParquetWriter`] +#[derive(Clone)] +pub struct ParquetWriterBuilder { + /// `buffer_size` determines the initial size of the intermediate buffer. + /// The intermediate buffer will automatically be resized if necessary + init_buffer_size: usize, + props: WriterProperties, + schema: ArrowSchemaRef, +} + +impl ParquetWriterBuilder { + /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB if the given buffer size is smaller than it. + const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024; + + /// Create a new `ParquetWriterBuilder` + /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. + pub fn new(init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef) -> Self { + Self { + init_buffer_size, + props, + schema, + } + } +} + +impl FileWriterBuilder for ParquetWriterBuilder { + type R = ParquetWriter; + + async fn build(self, out_file: OutputFile) -> crate::Result { + // Fetch field id from schema + let field_ids = self + .schema + .fields() + .iter() + .map(|field| { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .ok_or_else(|| { + Error::new( + crate::ErrorKind::Unexpected, + "Field id not found in arrow schema metadata.", + ) + })? + .parse::() + .map_err(|err| { + Error::new(crate::ErrorKind::Unexpected, "Failed to parse field id.") + .with_source(err) + }) + }) + .collect::>>()?; + + let written_size = Arc::new(AtomicI64::new(0)); + let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); + let init_buffer_size = max(Self::MIN_BUFFER_SIZE, self.init_buffer_size); + let writer = AsyncArrowWriter::try_new( + inner_writer, + self.schema.clone(), + init_buffer_size, + Some(self.props), + ) + .map_err(|err| { + Error::new( + crate::ErrorKind::Unexpected, + "Failed to build parquet writer.", + ) + .with_source(err) + })?; + + Ok(ParquetWriter { + writer, + written_size, + current_row_num: 0, + out_file, + field_ids, + }) + } +} + +/// `ParquetWriter`` is used to write arrow data into parquet file on storage. +pub struct ParquetWriter { + out_file: OutputFile, + writer: AsyncArrowWriter, + written_size: Arc, + current_row_num: usize, + field_ids: Vec, +} + +impl ParquetWriter { + fn to_data_file_builder( + field_ids: &[i32], + metadata: FileMetaData, + written_size: usize, + file_path: String, + ) -> Result { + // Only enter here when the file is not empty. + assert!(!metadata.row_groups.is_empty()); + if field_ids.len() != metadata.row_groups[0].columns.len() { + return Err(Error::new( + crate::ErrorKind::Unexpected, + "Len of field id is not match with len of columns in parquet metadata.", + )); + } + + let (column_sizes, value_counts, null_value_counts) = + { + let mut per_col_size: HashMap = HashMap::new(); + let mut per_col_val_num: HashMap = HashMap::new(); + let mut per_col_null_val_num: HashMap = HashMap::new(); + metadata.row_groups.iter().for_each(|group| { + group.columns.iter().zip(field_ids.iter()).for_each( + |(column_chunk, &field_id)| { + if let Some(column_chunk_metadata) = &column_chunk.meta_data { + *per_col_size.entry(field_id).or_insert(0) += + column_chunk_metadata.total_compressed_size as u64; + *per_col_val_num.entry(field_id).or_insert(0) += + column_chunk_metadata.num_values as u64; + *per_col_null_val_num.entry(field_id).or_insert(0_u64) += + column_chunk_metadata + .statistics + .as_ref() + .map(|s| s.null_count) + .unwrap_or(None) + .unwrap_or(0) as u64; + } + }, + ) + }); + (per_col_size, per_col_val_num, per_col_null_val_num) + }; + + let mut builder = DataFileBuilder::default(); + builder + .file_path(file_path) + .file_format(DataFileFormat::Parquet) + .record_count(metadata.num_rows as u64) + .file_size_in_bytes(written_size as u64) + .column_sizes(column_sizes) + .value_counts(value_counts) + .null_value_counts(null_value_counts) + // # TODO + // - nan_value_counts + // - lower_bounds + // - upper_bounds + .key_metadata(metadata.footer_signing_key_metadata.unwrap_or_default()) + .split_offsets( + metadata + .row_groups + .iter() + .filter_map(|group| group.file_offset) + .collect(), + ); + Ok(builder) + } +} + +impl FileWriter for ParquetWriter { + async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> { + self.current_row_num += batch.num_rows(); + self.writer.write(batch).await.map_err(|err| { + Error::new( + crate::ErrorKind::Unexpected, + "Failed to write using parquet writer.", + ) + .with_source(err) + })?; + Ok(()) + } + + async fn close(self) -> crate::Result> { + let metadata = self.writer.close().await.map_err(|err| { + Error::new( + crate::ErrorKind::Unexpected, + "Failed to close parquet writer.", + ) + .with_source(err) + })?; + + let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed); + + Ok(vec![Self::to_data_file_builder( + &self.field_ids, + metadata, + written_size as usize, + self.out_file.location().to_string(), + )?]) + } +} + +impl CurrentFileStatus for ParquetWriter { + fn current_file_path(&self) -> String { + self.out_file.location().to_string() + } + + fn current_row_num(&self) -> usize { + self.current_row_num + } + + fn current_written_size(&self) -> usize { + self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use anyhow::Result; + use arrow_array::ArrayRef; + use arrow_array::Int64Array; + use arrow_array::RecordBatch; + use bytes::Bytes; + use futures::AsyncReadExt; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::Struct; + + #[derive(Clone)] + struct TestLocationGen; + + #[tokio::test] + async fn test_parquet_writer() -> Result<()> { + // create output file + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.parquet"); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let out_file = file_io.new_output(path.to_str().unwrap()).unwrap(); + + // prepare data + let schema = { + let fields = vec![ + arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef; + let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); + let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap(); + + // write data + let mut pw = + ParquetWriterBuilder::new(0, WriterProperties::builder().build(), to_write.schema()) + .build(out_file) + .await?; + pw.write(&to_write).await?; + pw.write(&to_write_null).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + // Put dummy field for build successfully. + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + + // read the written file + let mut input_file = file_io + .new_input(data_file.file_path.clone()) + .unwrap() + .reader() + .await + .unwrap(); + let mut res = vec![]; + let file_size = input_file.read_to_end(&mut res).await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let mut reader = reader_builder.build().unwrap(); + let res = reader.next().unwrap().unwrap(); + assert_eq!(to_write, res); + let res = reader.next().unwrap().unwrap(); + assert_eq!(to_write_null, res); + + // check metadata + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.row_group(0).num_columns(), 1); + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + assert_eq!(data_file.file_size_in_bytes, file_size as u64); + assert_eq!(data_file.column_sizes.len(), 1); + assert_eq!( + *data_file.column_sizes.get(&0).unwrap(), + metadata.row_group(0).column(0).compressed_size() as u64 + ); + assert_eq!(data_file.value_counts.len(), 1); + assert_eq!(*data_file.value_counts.get(&0).unwrap(), 2048); + assert_eq!(data_file.null_value_counts.len(), 1); + assert_eq!(*data_file.null_value_counts.get(&0).unwrap(), 1024); + assert_eq!(data_file.key_metadata.len(), 0); + assert_eq!(data_file.split_offsets.len(), 1); + assert_eq!( + *data_file.split_offsets.first().unwrap(), + metadata.row_group(0).file_offset().unwrap() + ); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs b/crates/iceberg/src/writer/file_writer/track_writer.rs new file mode 100644 index 0000000000..938addd086 --- /dev/null +++ b/crates/iceberg/src/writer/file_writer/track_writer.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + pin::Pin, + sync::{atomic::AtomicI64, Arc}, +}; + +use tokio::io::AsyncWrite; + +use crate::io::FileWrite; + +/// `TrackWriter` is used to track the written size. +pub(crate) struct TrackWriter { + inner: Box, + written_size: Arc, +} + +impl TrackWriter { + pub fn new(writer: Box, written_size: Arc) -> Self { + Self { + inner: writer, + written_size, + } + } +} + +impl AsyncWrite for TrackWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match Pin::new(&mut self.inner).poll_write(cx, buf) { + std::task::Poll::Ready(Ok(n)) => { + self.written_size + .fetch_add(buf.len() as i64, std::sync::atomic::Ordering::Relaxed); + std::task::Poll::Ready(Ok(n)) + } + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} From 7582308f286309f365aa492280e21a7a51786402 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sat, 3 Feb 2024 14:19:35 +0800 Subject: [PATCH 2/6] refine parquet writer --- .../writer/file_writer/location_generator.rs | 108 ++++++++++++++++++ crates/iceberg/src/writer/file_writer/mod.rs | 6 +- .../src/writer/file_writer/parquet_writer.rs | 58 +++++++--- 3 files changed, 156 insertions(+), 16 deletions(-) create mode 100644 crates/iceberg/src/writer/file_writer/location_generator.rs diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs new file mode 100644 index 0000000000..009fa6b79d --- /dev/null +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the location generator and file name generator for generating path of data file. + +use std::sync::{atomic::AtomicU64, Arc}; + +/// `LocationGenerator` used to generate the location of data file. +pub trait LocationGenerator: Clone + Send + 'static { + /// Generate a absolute path for the given file name. + /// e.g + /// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet" + fn generate_location(&self, file_name: &str) -> String; +} + +/// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file. +pub trait FileNameGenerator: Clone + Send + 'static { + /// Generate a file name. + fn generate_file_name(&self) -> String; +} + +/// `DefaultFileNameGenerator` used to generate file name for data file. The file name can be +/// passed to `LocationGenerator` to generate the location of the file. +/// The rule of file name is aligned with the OutputFileFactory in iceberg-java +#[derive(Clone)] +pub struct DefaultFileNameGenerator { + partition_id: u64, + task_id: u64, + // The purpose of this id is to be able to know from two paths that they were written by the + // same operation. + // That's useful, for example, if a Spark job dies and leaves files in the file system, you can + // identify them all + // with a recursive listing and grep. + operator_id: String, + suffix: String, + file_count: Arc, +} + +impl DefaultFileNameGenerator { + /// Create a new `FileNameGenerator`. + pub fn new( + partition_id: u64, + task_id: u64, + operator_id: String, + suffix: Option, + ) -> Self { + let suffix = if let Some(suffix) = suffix { + format!("--{}", suffix) + } else { + "".to_string() + }; + Self { + partition_id, + task_id, + operator_id, + suffix, + file_count: Arc::new(AtomicU64::new(0)), + } + } +} + +impl FileNameGenerator for DefaultFileNameGenerator { + fn generate_file_name(&self) -> String { + let file_id = self + .file_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + format!( + "{:05}-{}-{}-{:05}{}", + self.partition_id, self.task_id, self.operator_id, file_id, self.suffix + ) + } +} + +#[cfg(test)] +pub(crate) mod test { + use super::LocationGenerator; + + #[derive(Clone)] + pub(crate) struct MockLocationGenerator { + root: String, + } + + impl MockLocationGenerator { + pub(crate) fn new(root: String) -> Self { + Self { root } + } + } + + impl LocationGenerator for MockLocationGenerator { + fn generate_location(&self, file_name: &str) -> String { + format!("{}/{}", self.root, file_name) + } + } +} diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index f6d75c3ede..f2848f4d44 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -18,7 +18,7 @@ //! This module contains the writer for data file format supported by iceberg: parquet, orc. use super::{CurrentFileStatus, DefaultOutput}; -use crate::{io::OutputFile, Result}; +use crate::Result; use arrow_array::RecordBatch; use futures::Future; @@ -26,12 +26,14 @@ mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; +pub mod location_generator; + /// File writer builder trait. pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. - fn build(self, out_file: OutputFile) -> impl Future> + Send; + fn build(self) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 1556c76e19..a7236654d3 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -23,7 +23,7 @@ use std::{ sync::{atomic::AtomicI64, Arc}, }; -use crate::Result; +use crate::{io::FileIO, Result}; use crate::{ io::OutputFile, spec::{DataFileBuilder, DataFileFormat}, @@ -34,37 +34,55 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use parquet::{arrow::AsyncArrowWriter, format::FileMetaData}; use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties}; -use super::{track_writer::TrackWriter, FileWriter, FileWriterBuilder}; +use super::{ + location_generator::{FileNameGenerator, LocationGenerator}, + track_writer::TrackWriter, + FileWriter, FileWriterBuilder, +}; /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone)] -pub struct ParquetWriterBuilder { +pub struct ParquetWriterBuilder { /// `buffer_size` determines the initial size of the intermediate buffer. /// The intermediate buffer will automatically be resized if necessary init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef, + + file_io: FileIO, + location_generator: T, + file_name_generator: F, } -impl ParquetWriterBuilder { +impl ParquetWriterBuilder { /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB if the given buffer size is smaller than it. const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024; /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. - pub fn new(init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef) -> Self { + pub fn new( + init_buffer_size: usize, + props: WriterProperties, + schema: ArrowSchemaRef, + file_io: FileIO, + location_generator: T, + file_name_generator: F, + ) -> Self { Self { init_buffer_size, props, schema, + file_io, + location_generator, + file_name_generator, } } } -impl FileWriterBuilder for ParquetWriterBuilder { +impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self, out_file: OutputFile) -> crate::Result { + async fn build(self) -> crate::Result { // Fetch field id from schema let field_ids = self .schema @@ -89,6 +107,10 @@ impl FileWriterBuilder for ParquetWriterBuilder { .collect::>>()?; let written_size = Arc::new(AtomicI64::new(0)); + let out_file = self.file_io.new_output( + self.location_generator + .generate_location(&self.file_name_generator.generate_file_name()), + )?; let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); let init_buffer_size = max(Self::MIN_BUFFER_SIZE, self.init_buffer_size); let writer = AsyncArrowWriter::try_new( @@ -256,17 +278,19 @@ mod tests { use super::*; use crate::io::FileIOBuilder; use crate::spec::Struct; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; #[derive(Clone)] struct TestLocationGen; #[tokio::test] async fn test_parquet_writer() -> Result<()> { - // create output file let temp_dir = TempDir::new().unwrap(); - let path = temp_dir.path().join("test.parquet"); let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - let out_file = file_io.new_output(path.to_str().unwrap()).unwrap(); + let loccation_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = DefaultFileNameGenerator::new(0, 0, "test".to_string(), None); // prepare data let schema = { @@ -283,10 +307,16 @@ mod tests { let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap(); // write data - let mut pw = - ParquetWriterBuilder::new(0, WriterProperties::builder().build(), to_write.schema()) - .build(out_file) - .await?; + let mut pw = ParquetWriterBuilder::new( + 0, + WriterProperties::builder().build(), + to_write.schema(), + file_io.clone(), + loccation_gen, + file_name_gen, + ) + .build() + .await?; pw.write(&to_write).await?; pw.write(&to_write_null).await?; let res = pw.close().await?; From 759465501b0b6e0a2ebcb89692fb40f76db52b0f Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 5 Feb 2024 19:28:45 +0800 Subject: [PATCH 3/6] refine --- .../writer/file_writer/location_generator.rs | 35 +++++++------------ .../src/writer/file_writer/parquet_writer.rs | 3 +- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 009fa6b79d..929f97dbe8 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -19,9 +19,11 @@ use std::sync::{atomic::AtomicU64, Arc}; +use crate::spec::DataFileFormat; + /// `LocationGenerator` used to generate the location of data file. pub trait LocationGenerator: Clone + Send + 'static { - /// Generate a absolute path for the given file name. + /// Generate an absolute path for the given file name. /// e.g /// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet" fn generate_location(&self, file_name: &str) -> String; @@ -35,39 +37,28 @@ pub trait FileNameGenerator: Clone + Send + 'static { /// `DefaultFileNameGenerator` used to generate file name for data file. The file name can be /// passed to `LocationGenerator` to generate the location of the file. -/// The rule of file name is aligned with the OutputFileFactory in iceberg-java +/// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}". #[derive(Clone)] pub struct DefaultFileNameGenerator { - partition_id: u64, - task_id: u64, - // The purpose of this id is to be able to know from two paths that they were written by the - // same operation. - // That's useful, for example, if a Spark job dies and leaves files in the file system, you can - // identify them all - // with a recursive listing and grep. - operator_id: String, + prefix: String, suffix: String, + format: String, file_count: Arc, } impl DefaultFileNameGenerator { /// Create a new `FileNameGenerator`. - pub fn new( - partition_id: u64, - task_id: u64, - operator_id: String, - suffix: Option, - ) -> Self { + pub fn new(prefix: String, suffix: Option, format: DataFileFormat) -> Self { let suffix = if let Some(suffix) = suffix { - format!("--{}", suffix) + format!("-{}", suffix) } else { "".to_string() }; + Self { - partition_id, - task_id, - operator_id, + prefix, suffix, + format: format.to_string(), file_count: Arc::new(AtomicU64::new(0)), } } @@ -79,8 +70,8 @@ impl FileNameGenerator for DefaultFileNameGenerator { .file_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed); format!( - "{:05}-{}-{}-{:05}{}", - self.partition_id, self.task_id, self.operator_id, file_id, self.suffix + "{}-{:05}{}.{}", + self.prefix, file_id, self.suffix, self.format ) } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index a7236654d3..236dfade7c 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -290,7 +290,8 @@ mod tests { let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let loccation_gen = MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); - let file_name_gen = DefaultFileNameGenerator::new(0, 0, "test".to_string(), None); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // prepare data let schema = { From fe0acb199094cff7069eac481e3bf8036bdcd884 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 1 Mar 2024 13:23:19 +0800 Subject: [PATCH 4/6] add more test --- crates/iceberg/src/spec/table_metadata.rs | 38 ++-- .../writer/file_writer/location_generator.rs | 137 +++++++++++- .../src/writer/file_writer/parquet_writer.rs | 209 ++++++++++++++++++ 3 files changed, 364 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index a6eb05c6c6..0ce3e742b1 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -52,46 +52,46 @@ pub type TableMetadataRef = Arc; /// We check the validity of this data structure when constructing. pub struct TableMetadata { /// Integer Version for the format. - format_version: FormatVersion, + pub(crate) format_version: FormatVersion, /// A UUID that identifies the table - table_uuid: Uuid, + pub(crate) table_uuid: Uuid, /// Location tables base location - location: String, + pub(crate) location: String, /// The tables highest sequence number - last_sequence_number: i64, + pub(crate) last_sequence_number: i64, /// Timestamp in milliseconds from the unix epoch when the table was last updated. - last_updated_ms: i64, + pub(crate) last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. - last_column_id: i32, + pub(crate) last_column_id: i32, /// A list of schemas, stored as objects with schema-id. - schemas: HashMap, + pub(crate) schemas: HashMap, /// ID of the table’s current schema. - current_schema_id: i32, + pub(crate) current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. - partition_specs: HashMap, + pub(crate) partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. - default_spec_id: i32, + pub(crate) default_spec_id: i32, /// An integer; the highest assigned partition field ID across all partition specs for the table. - last_partition_id: i32, + pub(crate) last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. /// For example, commit.retry.num-retries is used to control the number of commit retries. - properties: HashMap, + pub(crate) properties: HashMap, /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. - current_snapshot_id: Option, + pub(crate) current_snapshot_id: Option, ///A list of valid snapshots. Valid snapshots are snapshots for which all /// data files exist in the file system. A data file must not be deleted /// from the file system until the last snapshot in which it was listed is /// garbage collected. - snapshots: HashMap, + pub(crate) snapshots: HashMap, /// A list (optional) of timestamp and snapshot ID pairs that encodes changes /// to the current snapshot for the table. Each time the current-snapshot-id /// is changed, a new entry should be added with the last-updated-ms /// and the new current-snapshot-id. When snapshots are expired from /// the list of valid snapshots, all entries before a snapshot that has /// expired should be removed. - snapshot_log: Vec, + pub(crate) snapshot_log: Vec, /// A list (optional) of timestamp and metadata file location pairs /// that encodes changes to the previous metadata files for the table. @@ -99,19 +99,19 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. - metadata_log: Vec, + pub(crate) metadata_log: Vec, /// A list of sort orders, stored as full sort order objects. - sort_orders: HashMap, + pub(crate) sort_orders: HashMap, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. - default_sort_order_id: i64, + pub(crate) default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. - refs: HashMap, + pub(crate) refs: HashMap, } impl TableMetadata { diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 929f97dbe8..a86f53ad1f 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -19,7 +19,10 @@ use std::sync::{atomic::AtomicU64, Arc}; -use crate::spec::DataFileFormat; +use crate::{ + spec::{DataFileFormat, TableMetadata}, + Error, ErrorKind, Result, +}; /// `LocationGenerator` used to generate the location of data file. pub trait LocationGenerator: Clone + Send + 'static { @@ -29,6 +32,53 @@ pub trait LocationGenerator: Clone + Send + 'static { fn generate_location(&self, file_name: &str) -> String; } +const WRITE_DATA_LOCATION: &str = "write.data.path"; +const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path"; +const DEFAULT_DATA_DIR: &str = "/data"; + +#[derive(Clone)] +/// `DefaultLocationGenerator` used to generate the data dir location of data file. +/// The location is generated based on the table location and the data location in table properties. +pub struct DefaultLocationGenerator { + dir_path: String, +} + +impl DefaultLocationGenerator { + /// Create a new `DefaultLocationGenerator`. + pub fn new(table_metadata: TableMetadata) -> Result { + let table_location = table_metadata.location(); + let rel_dir_path = { + let prop = table_metadata.properties(); + let data_location = prop + .get(WRITE_DATA_LOCATION) + .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION)); + if let Some(data_location) = data_location { + data_location.strip_prefix(table_location).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "data location {} is not a subpath of table location {}", + data_location, table_location + ), + ) + })? + } else { + DEFAULT_DATA_DIR + } + }; + + Ok(Self { + dir_path: format!("{}{}", table_location, rel_dir_path), + }) + } +} + +impl LocationGenerator for DefaultLocationGenerator { + fn generate_location(&self, file_name: &str) -> String { + format!("{}/{}", self.dir_path, file_name) + } +} + /// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file. pub trait FileNameGenerator: Clone + Send + 'static { /// Generate a file name. @@ -78,6 +128,17 @@ impl FileNameGenerator for DefaultFileNameGenerator { #[cfg(test)] pub(crate) mod test { + use std::collections::HashMap; + + use uuid::Uuid; + + use crate::{ + spec::{FormatVersion, TableMetadata}, + writer::file_writer::location_generator::{ + FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, + }, + }; + use super::LocationGenerator; #[derive(Clone)] @@ -96,4 +157,78 @@ pub(crate) mod test { format!("{}/{}", self.root, file_name) } } + + #[test] + fn test_default_location_generate() { + let mut table_metadata = TableMetadata { + format_version: FormatVersion::V2, + table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), + location: "s3://data.db/table".to_string(), + last_updated_ms: 1515100955770, + last_column_id: 1, + schemas: HashMap::new(), + current_schema_id: 1, + partition_specs: HashMap::new(), + default_spec_id: 1, + last_partition_id: 1000, + default_sort_order_id: 0, + sort_orders: HashMap::from_iter(vec![]), + snapshots: HashMap::default(), + current_snapshot_id: None, + last_sequence_number: 1, + properties: HashMap::new(), + snapshot_log: Vec::new(), + metadata_log: vec![], + refs: HashMap::new(), + }; + + let file_name_genertaor = super::DefaultFileNameGenerator::new( + "part".to_string(), + Some("test".to_string()), + crate::spec::DataFileFormat::Parquet, + ); + + // test default data location + let location_generator = + super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap(); + let location = + location_generator.generate_location(&file_name_genertaor.generate_file_name()); + assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet"); + + // test custom data location + table_metadata.properties.insert( + WRITE_FOLDER_STORAGE_LOCATION.to_string(), + "s3://data.db/table/data_1".to_string(), + ); + let location_generator = + super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap(); + let location = + location_generator.generate_location(&file_name_genertaor.generate_file_name()); + assert_eq!( + location, + "s3://data.db/table/data_1/part-00001-test.parquet" + ); + + table_metadata.properties.insert( + WRITE_DATA_LOCATION.to_string(), + "s3://data.db/table/data_2".to_string(), + ); + let location_generator = + super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap(); + let location = + location_generator.generate_location(&file_name_genertaor.generate_file_name()); + assert_eq!( + location, + "s3://data.db/table/data_2/part-00002-test.parquet" + ); + + // test invalid data location + table_metadata.properties.insert( + WRITE_DATA_LOCATION.to_string(), + // invalid table location + "s3://data.db/data_3".to_string(), + ); + let location_generator = super::DefaultLocationGenerator::new(table_metadata.clone()); + assert!(location_generator.is_err()); + } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 236dfade7c..73381be8a7 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -266,9 +266,11 @@ mod tests { use std::sync::Arc; use anyhow::Result; + use arrow_array::types::Int64Type; use arrow_array::ArrayRef; use arrow_array::Int64Array; use arrow_array::RecordBatch; + use arrow_array::StructArray; use bytes::Bytes; use futures::AsyncReadExt; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -382,4 +384,211 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_parquet_writer_with_complex_schema() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let loccation_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // prepare data + // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new( + "col1", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Int64, + true, + )] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new( + "col3", + arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new( + "item", + arrow_schema::DataType::Int64, + true, + ))), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new( + "col4", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + )] + .into(), + ), + true, + )] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let col1 = Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Int64, + true, + )] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + )); + let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ + "test"; + 1024 + ])) as ArrayRef; + let col3 = Arc::new( + arrow_array::ListArray::from_iter_primitive::(vec![ + Some( + vec![Some(1),] + ); + 1024 + ]), + ) as ArrayRef; + let col4 = Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + )] + .into(), + ), + true, + )] + .into(), + vec![Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + )] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + ))], + None, + )); + let to_write = + RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap(); + + // write data + let mut pw = ParquetWriterBuilder::new( + 0, + WriterProperties::builder().build(), + to_write.schema(), + file_io.clone(), + loccation_gen, + file_name_gen, + ) + .build() + .await?; + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + // Put dummy field for build successfully. + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + + // read the written file + let mut input_file = file_io + .new_input(data_file.file_path.clone()) + .unwrap() + .reader() + .await + .unwrap(); + let mut res = vec![]; + let file_size = input_file.read_to_end(&mut res).await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let mut reader = reader_builder.build().unwrap(); + let res = reader.next().unwrap().unwrap(); + assert_eq!(to_write, res); + + // check metadata + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.row_group(0).num_columns(), 5); + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + assert_eq!(data_file.file_size_in_bytes, file_size as u64); + assert_eq!(data_file.column_sizes.len(), 5); + assert_eq!( + *data_file.column_sizes.get(&0).unwrap(), + metadata.row_group(0).column(0).compressed_size() as u64 + ); + assert_eq!(data_file.value_counts.len(), 5); + data_file + .value_counts + .iter() + .for_each(|(_, v)| assert_eq!(*v, 1024)); + assert_eq!(data_file.null_value_counts.len(), 5); + data_file + .null_value_counts + .iter() + .for_each(|(_, v)| assert_eq!(*v, 0)); + assert_eq!(data_file.key_metadata.len(), 0); + assert_eq!(data_file.split_offsets.len(), 1); + assert_eq!( + *data_file.split_offsets.first().unwrap(), + metadata.row_group(0).file_offset().unwrap() + ); + + Ok(()) + } } From 6dc2b71c2f3d570da8c93294dc438352d8fe2c2e Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 4 Mar 2024 15:20:20 +0800 Subject: [PATCH 5/6] fix typo --- .../src/writer/file_writer/parquet_writer.rs | 88 +++++++++++++------ 1 file changed, 62 insertions(+), 26 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 73381be8a7..bb4550fabe 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -389,7 +389,7 @@ mod tests { async fn test_parquet_writer_with_complex_schema() -> Result<()> { let temp_dir = TempDir::new().unwrap(); let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - let loccation_gen = + let location_gen = MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); @@ -410,7 +410,11 @@ mod tests { "sub_col", arrow_schema::DataType::Int64, true, - )] + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] .into(), ), true, @@ -424,11 +428,13 @@ mod tests { ), arrow_schema::Field::new( "col3", - arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new( - "item", - arrow_schema::DataType::Int64, - true, - ))), + arrow_schema::DataType::List(Arc::new( + arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )])), + )), true, ) .with_metadata(HashMap::from([( @@ -445,11 +451,19 @@ mod tests { "sub_sub_col", arrow_schema::DataType::Int64, true, - )] + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] .into(), ), true, - )] + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] .into(), ), true, @@ -463,11 +477,13 @@ mod tests { }; let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; let col1 = Arc::new(StructArray::new( - vec![arrow_schema::Field::new( - "sub_col", - arrow_schema::DataType::Int64, - true, - )] + vec![ + arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )])), + ] .into(), vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], None, @@ -476,14 +492,24 @@ mod tests { "test"; 1024 ])) as ArrayRef; - let col3 = Arc::new( - arrow_array::ListArray::from_iter_primitive::(vec![ + let col3 = Arc::new({ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ Some( vec![Some(1),] ); 1024 - ]), - ) as ArrayRef; + ]) + .into_parts(); + arrow_array::ListArray::new( + Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))), + list_parts.1, + list_parts.2, + list_parts.3, + ) + }) as ArrayRef; let col4 = Arc::new(StructArray::new( vec![arrow_schema::Field::new( "sub_col", @@ -492,18 +518,28 @@ mod tests { "sub_sub_col", arrow_schema::DataType::Int64, true, - )] + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] .into(), ), true, - )] + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] .into(), vec![Arc::new(StructArray::new( - vec![arrow_schema::Field::new( - "sub_sub_col", - arrow_schema::DataType::Int64, - true, - )] + vec![ + arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )])), + ] .into(), vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], None, @@ -519,7 +555,7 @@ mod tests { WriterProperties::builder().build(), to_write.schema(), file_io.clone(), - loccation_gen, + location_gen, file_name_gen, ) .build() From 820aa4164a84e20655e4249a56b97c0acbf4a75b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sat, 9 Mar 2024 20:46:10 +0800 Subject: [PATCH 6/6] fix conflict --- Cargo.toml | 1 - crates/iceberg/src/io.rs | 2 +- crates/iceberg/src/scan.rs | 15 +++++++++------ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9935a607ca..3234bd07a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,6 @@ ordered-float = "4.0.0" parquet = "50" pretty_assertions = "1.4.0" port_scanner = "0.1.5" -parquet = { version = ">=46", features = ["async"] } reqwest = { version = "^0.11", features = ["json"] } rust_decimal = "1.31.0" serde = { version = "^1.0", features = ["rc"] } diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs index d1bde2a0a6..d3f07cb648 100644 --- a/crates/iceberg/src/io.rs +++ b/crates/iceberg/src/io.rs @@ -54,8 +54,8 @@ use crate::{error::Result, Error, ErrorKind}; use futures::{AsyncRead, AsyncSeek, AsyncWrite}; use once_cell::sync::Lazy; use opendal::{Operator, Scheme}; -use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek}; use tokio::io::AsyncWrite as TokioAsyncWrite; +use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek}; use url::Url; /// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3). diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 3d2cbfae34..37fde8f519 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -314,14 +314,15 @@ mod tests { ManifestEntry::builder() .status(ManifestStatus::Added) .data_file( - DataFile::builder() + DataFileBuilder::default() .content(DataContentType::Data) .file_path(format!("{}/1.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) - .build(), + .build() + .unwrap(), ) .build(), ManifestEntry::builder() @@ -330,14 +331,15 @@ mod tests { .sequence_number(parent_snapshot.sequence_number()) .file_sequence_number(parent_snapshot.sequence_number()) .data_file( - DataFile::builder() + DataFileBuilder::default() .content(DataContentType::Data) .file_path(format!("{}/2.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(200))])) - .build(), + .build() + .unwrap(), ) .build(), ManifestEntry::builder() @@ -346,14 +348,15 @@ mod tests { .sequence_number(parent_snapshot.sequence_number()) .file_sequence_number(parent_snapshot.sequence_number()) .data_file( - DataFile::builder() + DataFileBuilder::default() .content(DataContentType::Data) .file_path(format!("{}/3.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(300))])) - .build(), + .build() + .unwrap(), ) .build(), ],