diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 931f9441e9..d7455b56fe 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -26,7 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay}; use super::_serde::DataFileSerde; use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2}; use crate::error::Result; -use crate::spec::{Struct, StructType}; +use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType}; use crate::{Error, ErrorKind}; /// Data file carries data file path, partition tuple, metrics, … @@ -49,6 +49,7 @@ pub struct DataFile { /// /// Partition data tuple, schema based on the partition spec output using /// partition field ids for the struct field ids + #[builder(default = "Struct::empty()")] pub(crate) partition: Struct, /// field id: 103 /// @@ -156,6 +157,7 @@ pub struct DataFile { pub(crate) first_row_id: Option, /// This field is not included in spec. It is just store in memory representation used /// in process. + #[builder(default = "DEFAULT_PARTITION_SPEC_ID")] pub(crate) partition_spec_id: i32, /// field id: 143 /// diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index fff9b62d78..128db338af 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -194,6 +194,15 @@ impl PartitionKey { Self { spec, schema, data } } + /// Creates a new partition key from another partition key, with a new data field. + pub fn copy_with_data(&self, data: Struct) -> Self { + Self { + spec: self.spec.clone(), + schema: self.schema.clone(), + data, + } + } + /// Generates a partition path based on the partition values. pub fn to_path(&self) -> String { self.spec.partition_to_path(&self.data, self.schema.clone()) @@ -207,6 +216,21 @@ impl PartitionKey { Some(pk) => pk.spec.is_unpartitioned(), } } + + /// Returns the associated [`PartitionSpec`]. + pub fn spec(&self) -> &PartitionSpec { + &self.spec + } + + /// Returns the associated [`SchemaRef`]. + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Returns the associated [`Struct`]. + pub fn data(&self) -> &Struct { + &self.data + } } /// Reference to [`UnboundPartitionSpec`]. diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index f215673df1..a950547d33 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -18,86 +18,126 @@ //! This module provide `DataFileWriter`. use arrow_array::RecordBatch; -use itertools::Itertools; -use crate::Result; -use crate::spec::{DataContentType, DataFile, Struct}; -use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::spec::{DataContentType, DataFile, PartitionKey}; +use crate::writer::file_writer::FileWriterBuilder; +use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; +use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder}; use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; /// Builder for `DataFileWriter`. #[derive(Clone, Debug)] -pub struct DataFileWriterBuilder { - inner: B, - partition_value: Option, - partition_spec_id: i32, +pub struct DataFileWriterBuilder { + inner: RollingFileWriterBuilder, + partition_key: Option, } -impl DataFileWriterBuilder { - /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B, partition_value: Option, partition_spec_id: i32) -> Self { +impl DataFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + /// Create a new `DataFileWriterBuilder` using a `RollingFileWriterBuilder`. + pub fn new( + inner_builder: RollingFileWriterBuilder, + partition_key: Option, + ) -> Self { Self { - inner, - partition_value, - partition_spec_id, + inner: inner_builder, + partition_key, } } } #[async_trait::async_trait] -impl IcebergWriterBuilder for DataFileWriterBuilder { - type R = DataFileWriter; +impl IcebergWriterBuilder for DataFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + type R = DataFileWriter; async fn build(self) -> Result { Ok(DataFileWriter { - inner_writer: Some(self.inner.clone().build().await?), - partition_value: self.partition_value.unwrap_or(Struct::empty()), - partition_spec_id: self.partition_spec_id, + inner: Some(self.inner.clone().build()), + partition_key: self.partition_key, }) } } /// A writer write data is within one spec/partition. #[derive(Debug)] -pub struct DataFileWriter { - inner_writer: Option, - partition_value: Struct, - partition_spec_id: i32, +pub struct DataFileWriter { + inner: Option>, + partition_key: Option, } #[async_trait::async_trait] -impl IcebergWriter for DataFileWriter { +impl IcebergWriter for DataFileWriter +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ async fn write(&mut self, batch: RecordBatch) -> Result<()> { - self.inner_writer.as_mut().unwrap().write(&batch).await + if let Some(writer) = self.inner.as_mut() { + writer.write(&self.partition_key, &batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )) + } } async fn close(&mut self) -> Result> { - let writer = self.inner_writer.take().unwrap(); - Ok(writer - .close() - .await? - .into_iter() - .map(|mut res| { - res.content(DataContentType::Data); - res.partition(self.partition_value.clone()); - res.partition_spec_id(self.partition_spec_id); - res.build().expect("Guaranteed to be valid") - }) - .collect_vec()) + if let Some(writer) = self.inner.take() { + writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(DataContentType::Data); + if let Some(pk) = self.partition_key.as_ref() { + res.partition(pk.data().clone()); + res.partition_spec_id(pk.spec().spec_id()); + } + res.build().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to build data file: {}", e), + ) + }) + }) + .collect() + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Data file writer has been closed.", + )) + } } } -impl CurrentFileStatus for DataFileWriter { +impl CurrentFileStatus for DataFileWriter +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ fn current_file_path(&self) -> String { - self.inner_writer.as_ref().unwrap().current_file_path() + self.inner.as_ref().unwrap().current_file_path() } fn current_row_num(&self) -> usize { - self.inner_writer.as_ref().unwrap().current_row_num() + self.inner.as_ref().unwrap().current_row_num() } fn current_written_size(&self) -> usize { - self.inner_writer.as_ref().unwrap().current_written_size() + self.inner.as_ref().unwrap().current_written_size() } } @@ -116,13 +156,15 @@ mod test { use crate::Result; use crate::io::FileIOBuilder; use crate::spec::{ - DataContentType, DataFileFormat, Literal, NestedField, PrimitiveType, Schema, Struct, Type, + DataContentType, DataFileFormat, Literal, NestedField, PartitionKey, PartitionSpec, + PrimitiveType, Schema, Struct, Type, }; use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; use crate::writer::file_writer::ParquetWriterBuilder; use crate::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; #[tokio::test] @@ -143,16 +185,16 @@ mod test { ]) .build()?; - let pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - None, + let pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)); + + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pw, file_io.clone(), location_gen, file_name_gen, ); - let mut data_file_writer = DataFileWriterBuilder::new(pw, None, 0) + let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder, None) .build() .await .unwrap(); @@ -219,20 +261,27 @@ mod test { NestedField::required(6, "name", Type::Primitive(PrimitiveType::String)).into(), ]) .build()?; + let schema_ref = Arc::new(schema); let partition_value = Struct::from_iter([Some(Literal::int(1))]); + let partition_key = PartitionKey::new( + PartitionSpec::builder(schema_ref.clone()).build()?, + schema_ref.clone(), + partition_value.clone(), + ); - let parquet_writer_builder = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema.clone()), - None, + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema_ref.clone()); + + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, file_io.clone(), location_gen, file_name_gen, ); let mut data_file_writer = - DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()), 0) + DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)) .build() .await?; diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 765ff1cacd..6740ed435c 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -26,21 +26,35 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_projector::RecordBatchProjector; use crate::arrow::schema_to_arrow_schema; -use crate::spec::{DataFile, SchemaRef, Struct}; -use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::spec::{DataFile, PartitionKey, SchemaRef}; +use crate::writer::file_writer::FileWriterBuilder; +use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; +use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; /// Builder for `EqualityDeleteWriter`. #[derive(Clone, Debug)] -pub struct EqualityDeleteFileWriterBuilder { - inner: B, +pub struct EqualityDeleteFileWriterBuilder< + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +> { + inner: RollingFileWriterBuilder, config: EqualityDeleteWriterConfig, } -impl EqualityDeleteFileWriterBuilder { - /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self { +impl EqualityDeleteFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + /// Create a new `EqualityDeleteFileWriterBuilder` using a `RollingFileWriterBuilder`. + pub fn new( + inner: RollingFileWriterBuilder, + config: EqualityDeleteWriterConfig, + ) -> Self { Self { inner, config } } } @@ -52,8 +66,7 @@ pub struct EqualityDeleteWriterConfig { equality_ids: Vec, // Projector used to project the data chunk into specific fields. projector: RecordBatchProjector, - partition_value: Struct, - partition_spec_id: i32, + partition_key: Option, } impl EqualityDeleteWriterConfig { @@ -61,8 +74,7 @@ impl EqualityDeleteWriterConfig { pub fn new( equality_ids: Vec, original_schema: SchemaRef, - partition_value: Option, - partition_spec_id: i32, + partition_key: Option, ) -> Result { let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); let projector = RecordBatchProjector::new( @@ -98,8 +110,7 @@ impl EqualityDeleteWriterConfig { Ok(Self { equality_ids, projector, - partition_value: partition_value.unwrap_or(Struct::empty()), - partition_spec_id, + partition_key, }) } @@ -110,36 +121,48 @@ impl EqualityDeleteWriterConfig { } #[async_trait::async_trait] -impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { - type R = EqualityDeleteFileWriter; +impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + type R = EqualityDeleteFileWriter; async fn build(self) -> Result { Ok(EqualityDeleteFileWriter { - inner_writer: Some(self.inner.clone().build().await?), + inner: Some(self.inner.clone().build()), // todo revisit this, probably still need a builder for rolling writer projector: self.config.projector, equality_ids: self.config.equality_ids, - partition_value: self.config.partition_value, - partition_spec_id: self.config.partition_spec_id, + partition_key: self.config.partition_key, }) } } /// Writer used to write equality delete files. #[derive(Debug)] -pub struct EqualityDeleteFileWriter { - inner_writer: Option, +pub struct EqualityDeleteFileWriter< + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +> { + inner: Option>, projector: RecordBatchProjector, equality_ids: Vec, - partition_value: Struct, - partition_spec_id: i32, + partition_key: Option, } #[async_trait::async_trait] -impl IcebergWriter for EqualityDeleteFileWriter { +impl IcebergWriter for EqualityDeleteFileWriter +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ async fn write(&mut self, batch: RecordBatch) -> Result<()> { let batch = self.projector.project_batch(batch)?; - if let Some(writer) = self.inner_writer.as_mut() { - writer.write(&batch).await + if let Some(writer) = self.inner.as_mut() { + writer.write(&self.partition_key, &batch).await } else { Err(Error::new( ErrorKind::Unexpected, @@ -149,19 +172,26 @@ impl IcebergWriter for EqualityDeleteFileWriter { } async fn close(&mut self) -> Result> { - if let Some(writer) = self.inner_writer.take() { - Ok(writer + if let Some(writer) = self.inner.take() { + writer .close() .await? .into_iter() .map(|mut res| { res.content(crate::spec::DataContentType::EqualityDeletes); res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec())); - res.partition(self.partition_value.clone()); - res.partition_spec_id(self.partition_spec_id); - res.build().expect("msg") + if let Some(pk) = self.partition_key.as_ref() { + res.partition(pk.data().clone()); + res.partition_spec_id(pk.spec().spec_id()); + } + res.build().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to build data file: {}", e), + ) + }) }) - .collect_vec()) + .collect() } else { Err(Error::new( ErrorKind::Unexpected, @@ -201,6 +231,7 @@ mod test { use crate::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; async fn check_parquet_data_file_with_equality_delete_write( @@ -397,23 +428,24 @@ mod test { let equality_ids = vec![0_i32, 8]; let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, 0).unwrap(); + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); let delete_schema = arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); let projector = equality_config.projector.clone(); // prepare writer - let pb = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(delete_schema), - None, + let pb = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(delete_schema)); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, file_io.clone(), location_gen, file_name_gen, ); - let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config) - .build() - .await?; + let mut equality_delete_writer = + EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config) + .build() + .await?; // write equality_delete_writer.write(to_write.clone()).await?; @@ -499,19 +531,19 @@ mod test { .unwrap(), ); // Float and Double are not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None, 0).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 0).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); // Struct is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None, 0).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); // Nested field of struct is allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None, 0).is_ok()); + assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); // Nested field of map is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None, 0).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None, 0).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None, 0).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err()); // Nested field of list is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None, 0).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None, 0).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err()); Ok(()) } @@ -565,22 +597,22 @@ mod test { .unwrap(), ); let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]; - let config = - EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None, 0).unwrap(); + let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None).unwrap(); let delete_arrow_schema = config.projected_arrow_schema_ref().clone(); let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap(); - let pb = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(delete_schema), - None, + let pb = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(delete_schema)); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, file_io.clone(), location_gen, file_name_gen, ); - let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config) - .build() - .await?; + let mut equality_delete_writer = + EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config) + .build() + .await?; // prepare data let col0 = Arc::new(BooleanArray::from(vec![ @@ -763,7 +795,7 @@ mod test { let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); let equality_ids = vec![0_i32, 2, 5]; let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, 0).unwrap(); + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); let projector = equality_config.projector.clone(); // check diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 2a5a735534..2ed6414ce8 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -27,6 +27,8 @@ use crate::spec::DataFileBuilder; mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; +use crate::io::OutputFile; + pub mod location_generator; /// Module providing writers that can automatically roll over to new files based on size thresholds. pub mod rolling_writer; @@ -38,7 +40,7 @@ pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. - fn build(self) -> impl Future> + Send; + fn build(self, output_file: OutputFile) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 0a8a095ea8..620f27df33 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -34,7 +34,6 @@ use parquet::format::FileMetaData; use parquet::thrift::{TCompactOutputProtocol, TSerializable}; use thrift::protocol::TOutputProtocol; -use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, @@ -43,8 +42,8 @@ use crate::arrow::{ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionKey, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, - Struct, StructType, TableMetadata, Type, visit_schema, + NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, + StructType, TableMetadata, Type, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -52,78 +51,43 @@ use crate::{Error, ErrorKind, Result}; /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] -pub struct ParquetWriterBuilder { +pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, - partition_key: Option, match_mode: FieldMatchMode, - - file_io: FileIO, - location_generator: T, - file_name_generator: F, } -impl ParquetWriterBuilder { +impl ParquetWriterBuilder { /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. - pub fn new( - props: WriterProperties, - schema: SchemaRef, - partition_key: Option, - file_io: FileIO, - location_generator: T, - file_name_generator: F, - ) -> Self { - Self::new_with_match_mode( - props, - schema, - partition_key, - FieldMatchMode::Id, - file_io, - location_generator, - file_name_generator, - ) + pub fn new(props: WriterProperties, schema: SchemaRef) -> Self { + Self::new_with_match_mode(props, schema, FieldMatchMode::Id) } /// Create a new `ParquetWriterBuilder` with custom match mode pub fn new_with_match_mode( props: WriterProperties, schema: SchemaRef, - partition_key: Option, match_mode: FieldMatchMode, - file_io: FileIO, - location_generator: T, - file_name_generator: F, ) -> Self { Self { props, schema, - partition_key, match_mode, - file_io, - location_generator, - file_name_generator, } } } -impl FileWriterBuilder for ParquetWriterBuilder { +impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self) -> Result { - let out_file = self - .file_io - .new_output(self.location_generator.generate_location( - self.partition_key.as_ref(), - &self.file_name_generator.generate_file_name(), - ))?; - + async fn build(self, output_file: OutputFile) -> Result { Ok(ParquetWriter { schema: self.schema.clone(), inner_writer: None, writer_properties: self.props, current_row_num: 0, - out_file, + output_file, nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), }) } @@ -250,7 +214,7 @@ impl SchemaVisitor for IndexByParquetPathName { /// `ParquetWriter`` is used to write arrow data into parquet file on storage. pub struct ParquetWriter { schema: SchemaRef, - out_file: OutputFile, + output_file: OutputFile, inner_writer: Option>>>, writer_properties: WriterProperties, current_row_num: usize, @@ -555,7 +519,7 @@ impl FileWriter for ParquetWriter { writer } else { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let inner_writer = self.out_file.writer().await?; + let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( async_writer, @@ -594,7 +558,7 @@ impl FileWriter for ParquetWriter { let written_size = writer.bytes_written(); if self.current_row_num == 0 { - self.out_file.delete().await.map_err(|err| { + self.output_file.delete().await.map_err(|err| { Error::new( ErrorKind::Unexpected, "Failed to delete empty parquet file.", @@ -616,7 +580,7 @@ impl FileWriter for ParquetWriter { self.schema, parquet_metadata, written_size, - self.out_file.location().to_string(), + self.output_file.location().to_string(), self.nan_value_count_visitor.nan_value_counts, )?]) } @@ -625,7 +589,7 @@ impl FileWriter for ParquetWriter { impl CurrentFileStatus for ParquetWriter { fn current_file_path(&self) -> String { - self.out_file.location().to_string() + self.output_file.location().to_string() } fn current_row_num(&self) -> usize { @@ -703,7 +667,7 @@ mod tests { use crate::io::FileIOBuilder; use crate::spec::{PrimitiveLiteral, Struct, *}; use crate::writer::file_writer::location_generator::{ - DefaultFileNameGenerator, DefaultLocationGenerator, + DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator, }; use crate::writer::tests::check_parquet_data_file; @@ -871,11 +835,13 @@ mod tests { // prepare data let schema = { - let fields = vec![ - arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]), - ), - ]; + let fields = + vec![ + Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + ]; Arc::new(arrow_schema::Schema::new(fields)) }; let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef; @@ -883,18 +849,18 @@ mod tests { let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap(); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + // write data let mut pw = ParquetWriterBuilder::new( WriterProperties::builder() .set_max_row_group_size(128) .build(), Arc::new(to_write.schema().as_ref().try_into().unwrap()), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; pw.write(&to_write_null).await?; @@ -905,7 +871,7 @@ mod tests { .next() .unwrap() // Put dummy field for build successfully. - .content(crate::spec::DataContentType::Data) + .content(DataContentType::Data) .partition(Struct::empty()) .partition_spec_id(0) .build() @@ -1006,7 +972,7 @@ mod tests { None, )); let col5 = Arc::new({ - let mut map_array_builder = arrow_array::builder::MapBuilder::new( + let mut map_array_builder = MapBuilder::new( None, arrow_array::builder::StringBuilder::new(), arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::< @@ -1083,18 +1049,15 @@ mod tests { col0, col1, col2, col3, col4, col5, ]) .unwrap(); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; // write data - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - None, - file_io.clone(), - location_gen, - file_name_gen, - ) - .build() - .await?; + let mut pw = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)) + .build(output_file) + .await?; pw.write(&to_write).await?; let res = pw.close().await?; assert_eq!(res.len(), 1); @@ -1161,7 +1124,7 @@ mod tests { async fn test_all_type_for_write() -> Result<()> { let temp_dir = TempDir::new().unwrap(); let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - let loccation_gen = DefaultLocationGenerator::with_data_location( + let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), ); let file_name_gen = @@ -1276,18 +1239,15 @@ mod tests { col14, col15, col16, ]) .unwrap(); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; // write data - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - None, - file_io.clone(), - loccation_gen, - file_name_gen, - ) - .build() - .await?; + let mut pw = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)) + .build(output_file) + .await?; pw.write(&to_write).await?; let res = pw.close().await?; assert_eq!(res.len(), 1); @@ -1405,7 +1365,7 @@ mod tests { async fn test_decimal_bound() -> Result<()> { let temp_dir = TempDir::new().unwrap(); let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - let loccation_gen = DefaultLocationGenerator::with_data_location( + let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), ); let file_name_gen = @@ -1429,16 +1389,12 @@ mod tests { .unwrap(), ); let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - schema.clone(), - None, - file_io.clone(), - loccation_gen.clone(), - file_name_gen.clone(), - ) - .build() - .await?; + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()) + .build(output_file) + .await?; let col0 = Arc::new( Decimal128Array::from(vec![Some(22000000000), Some(11000000000)]) .with_data_type(DataType::Decimal128(28, 10)), @@ -1485,16 +1441,12 @@ mod tests { .unwrap(), ); let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - schema.clone(), - None, - file_io.clone(), - loccation_gen.clone(), - file_name_gen.clone(), - ) - .build() - .await?; + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()) + .build(output_file) + .await?; let col0 = Arc::new( Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)]) .with_data_type(DataType::Decimal128(28, 10)), @@ -1544,16 +1496,12 @@ mod tests { .unwrap(), ); let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - schema, - None, - file_io.clone(), - loccation_gen, - file_name_gen, - ) - .build() - .await?; + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema) + .build(output_file) + .await?; let col0 = Arc::new( Decimal128Array::from(vec![ Some(decimal_max.mantissa()), @@ -1676,37 +1624,31 @@ mod tests { }; let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef; let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); + let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name()); + let output_file = file_io.new_output(&file_path)?; let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(to_write.schema().as_ref().try_into().unwrap()), - None, - file_io.clone(), - location_gen.clone(), - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; - let file_path = pw.out_file.location().to_string(); pw.close().await.unwrap(); - assert!(file_io.exists(file_path).await.unwrap()); + assert!(file_io.exists(&file_path).await.unwrap()); // Test that file will not create if no data to write let file_name_gen = DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet); + let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name()); + let output_file = file_io.new_output(&file_path)?; let pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(to_write.schema().as_ref().try_into().unwrap()), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; - let file_path = pw.out_file.location().to_string(); pw.close().await.unwrap(); - assert!(!file_io.exists(file_path).await.unwrap()); + assert!(!file_io.exists(&file_path).await.unwrap()); Ok(()) } @@ -1746,17 +1688,16 @@ mod tests { let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap(); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; // write data let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(to_write.schema().as_ref().try_into().unwrap()), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; @@ -1887,17 +1828,16 @@ mod tests { struct_nested_float_field_col, ]) .unwrap(); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; // write data let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(to_write.schema().as_ref().try_into().unwrap()), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; @@ -2047,6 +1987,9 @@ mod tests { // large_list_float_field_col, ]) .expect("Could not form record batch"); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; // write data let mut pw = ParquetWriterBuilder::new( @@ -2058,12 +2001,8 @@ mod tests { .try_into() .expect("Could not convert iceberg schema"), ), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; @@ -2229,6 +2168,9 @@ mod tests { struct_list_float_field_col, ]) .expect("Could not form record batch"); + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; // write data let mut pw = ParquetWriterBuilder::new( @@ -2240,12 +2182,8 @@ mod tests { .try_into() .expect("Could not convert iceberg schema"), ), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; @@ -2310,6 +2248,9 @@ mod tests { ); let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let output_file = file_io + .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name())) + .unwrap(); // write data let pw = ParquetWriterBuilder::new( @@ -2325,12 +2266,8 @@ mod tests { .build() .expect("Failed to create schema"), ), - None, - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await .unwrap(); diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 181205c371..0b9b105c5e 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -15,50 +15,104 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::{Debug, Formatter}; + use arrow_array::RecordBatch; -use crate::spec::DataFileBuilder; +use crate::io::{FileIO, OutputFile}; +use crate::spec::{DataFileBuilder, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey}; use crate::writer::CurrentFileStatus; +use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::{Error, ErrorKind, Result}; -/// Builder for creating a `RollingFileWriter` that rolls over to a new file -/// when the data size exceeds a target threshold. -#[derive(Clone)] -pub struct RollingFileWriterBuilder { +/// Builder for [`RollingFileWriter`]. +#[derive(Clone, Debug)] +pub struct RollingFileWriterBuilder< + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +> { inner_builder: B, target_file_size: usize, + file_io: FileIO, + location_generator: L, + file_name_generator: F, } -impl RollingFileWriterBuilder { - /// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size. +impl RollingFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + /// Creates a new `RollingFileWriterBuilder` with the specified target file size. /// - /// # Arguments + /// # Parameters /// /// * `inner_builder` - The builder for the underlying file writer - /// * `target_file_size` - The target size in bytes before rolling over to a new file + /// * `target_file_size` - The target file size in bytes that triggers rollover + /// * `file_io` - The file IO interface for creating output files + /// * `location_generator` - Generator for file locations + /// * `file_name_generator` - Generator for file names + /// + /// # Returns /// - /// NOTE: The `target_file_size` does not exactly reflect the final size on physical storage. - /// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior. - /// The actual file size on disk is expected to be slightly larger than `target_file_size`. - pub fn new(inner_builder: B, target_file_size: usize) -> Self { + /// A new `RollingFileWriterBuilder` instance + pub fn new( + inner_builder: B, + target_file_size: usize, + file_io: FileIO, + location_generator: L, + file_name_generator: F, + ) -> Self { Self { inner_builder, target_file_size, + file_io, + location_generator, + file_name_generator, } } -} -impl FileWriterBuilder for RollingFileWriterBuilder { - type R = RollingFileWriter; + /// Creates a new `RollingFileWriterBuilder` with the default target file size. + /// + /// # Parameters + /// + /// * `inner_builder` - The builder for the underlying file writer + /// * `file_io` - The file IO interface for creating output files + /// * `location_generator` - Generator for file locations + /// * `file_name_generator` - Generator for file names + /// + /// # Returns + /// + /// A new `RollingFileWriterBuilder` instance with default target file size + pub fn new_with_default_file_size( + inner_builder: B, + file_io: FileIO, + location_generator: L, + file_name_generator: F, + ) -> Self { + Self { + inner_builder, + target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + file_io, + location_generator, + file_name_generator, + } + } - async fn build(self) -> Result { - Ok(RollingFileWriter { + /// Build a new [`RollingFileWriter`]. + pub fn build(self) -> RollingFileWriter { + RollingFileWriter { inner: None, inner_builder: self.inner_builder, target_file_size: self.target_file_size, data_file_builders: vec![], - }) + file_io: self.file_io, + location_generator: self.location_generator, + file_name_generator: self.file_name_generator, + } } } @@ -68,14 +122,36 @@ impl FileWriterBuilder for RollingFileWriterBuilder { /// This writer wraps another file writer that tracks the amount of data written. /// When the data size exceeds the target size, it closes the current file and /// starts writing to a new one. -pub struct RollingFileWriter { +pub struct RollingFileWriter { inner: Option, inner_builder: B, target_file_size: usize, data_file_builders: Vec, + file_io: FileIO, + location_generator: L, + file_name_generator: F, +} + +impl Debug for RollingFileWriter +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RollingFileWriter") + .field("target_file_size", &self.target_file_size) + .field("file_io", &self.file_io) + .finish() + } } -impl RollingFileWriter { +impl RollingFileWriter +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ /// Determines if the writer should roll over to a new file. /// /// # Returns @@ -84,13 +160,42 @@ impl RollingFileWriter { fn should_roll(&self) -> bool { self.current_written_size() > self.target_file_size } -} -impl FileWriter for RollingFileWriter { - async fn write(&mut self, input: &RecordBatch) -> Result<()> { + fn new_output_file(&self, partition_key: &Option) -> Result { + self.file_io + .new_output(self.location_generator.generate_location( + partition_key.as_ref(), + &self.file_name_generator.generate_file_name(), + )) + } + + /// Writes a record batch to the current file, rolling over to a new file if necessary. + /// + /// # Parameters + /// + /// * `partition_key` - Optional partition key for the data + /// * `input` - The record batch to write + /// + /// # Returns + /// + /// A `Result` indicating success or failure + /// + /// # Errors + /// + /// Returns an error if the writer is not initialized or if writing fails + pub async fn write( + &mut self, + partition_key: &Option, + input: &RecordBatch, + ) -> Result<()> { if self.inner.is_none() { // initialize inner writer - self.inner = Some(self.inner_builder.clone().build().await?); + self.inner = Some( + self.inner_builder + .clone() + .build(self.new_output_file(partition_key)?) + .await?, + ); } if self.should_roll() { @@ -99,7 +204,12 @@ impl FileWriter for RollingFileWriter { self.data_file_builders.extend(inner.close().await?); // start a new writer - self.inner = Some(self.inner_builder.clone().build().await?); + self.inner = Some( + self.inner_builder + .clone() + .build(self.new_output_file(partition_key)?) + .await?, + ); } } @@ -114,7 +224,13 @@ impl FileWriter for RollingFileWriter { } } - async fn close(mut self) -> Result> { + /// Closes the writer and returns all data file builders. + /// + /// # Returns + /// + /// A `Result` containing a vector of `DataFileBuilder` instances representing + /// all files that were written, including any that were created due to rollover + pub async fn close(mut self) -> Result> { // close the current writer and merge the output if let Some(current_writer) = self.inner { self.data_file_builders @@ -125,7 +241,9 @@ impl FileWriter for RollingFileWriter { } } -impl CurrentFileStatus for RollingFileWriter { +impl CurrentFileStatus + for RollingFileWriter +{ fn current_file_path(&self) -> String { self.inner.as_ref().unwrap().current_file_path() } @@ -199,22 +317,20 @@ mod tests { let schema = make_test_schema()?; // Create writer builders - let parquet_writer_builder = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - None, - file_io.clone(), - location_gen, - file_name_gen, - ); + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)); // Set a large target size so no rolling occurs - let rolling_writer_builder = RollingFileWriterBuilder::new( + let rolling_file_writer_builder = RollingFileWriterBuilder::new( parquet_writer_builder, - 1024 * 1024, // 1MB, large enough to not trigger rolling + 1024 * 1024, + file_io.clone(), + location_gen, + file_name_gen, ); - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + let data_file_writer_builder = + DataFileWriterBuilder::new(rolling_file_writer_builder, None); // Create writer let mut writer = data_file_writer_builder.build().await?; @@ -260,19 +376,19 @@ mod tests { let schema = make_test_schema()?; // Create writer builders - let parquet_writer_builder = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - None, - file_io.clone(), + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)); + + // Set a very small target size to trigger rolling + let rolling_writer_builder = RollingFileWriterBuilder::new( + parquet_writer_builder, + 1024, + file_io, location_gen, file_name_gen, ); - // Set a very small target size to trigger rolling - let rolling_writer_builder = RollingFileWriterBuilder::new(parquet_writer_builder, 1024); - - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None); // Create writer let mut writer = data_file_writer_builder.build().await?; diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 8f17d50e27..d5a8a66861 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -60,6 +60,9 @@ //! async fn main() -> Result<()> { //! // Connect to a catalog. //! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +//! use iceberg::writer::file_writer::rolling_writer::{ +//! RollingFileWriter, RollingFileWriterBuilder, +//! }; //! let catalog = MemoryCatalogBuilder::default() //! .load( //! "memory", @@ -86,15 +89,21 @@ //! let parquet_writer_builder = ParquetWriterBuilder::new( //! WriterProperties::default(), //! table.metadata().current_schema().clone(), -//! None, +//! ); +//! +//! // Create a rolling file writer using parquet file writer builder. +//! let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( +//! parquet_writer_builder, //! table.file_io().clone(), //! location_generator.clone(), //! file_name_generator.clone(), //! ); +//! //! // Create a data file writer using parquet file writer builder. -//! let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); +//! let data_file_writer_builder = +//! DataFileWriterBuilder::new(rolling_file_writer_builder, None); //! // Build the data file writer -//! let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); +//! let mut data_file_writer = data_file_writer_builder.build().await?; //! //! // Write the data using data_file_writer... //! @@ -174,6 +183,9 @@ //! // Connect to a catalog. //! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; //! use iceberg::spec::{Literal, PartitionKey, Struct}; +//! use iceberg::writer::file_writer::rolling_writer::{ +//! RollingFileWriter, RollingFileWriterBuilder, +//! }; //! //! let catalog = MemoryCatalogBuilder::default() //! .load( @@ -207,13 +219,20 @@ //! let parquet_writer_builder = ParquetWriterBuilder::new( //! WriterProperties::default(), //! table.metadata().current_schema().clone(), -//! Some(partition_key), +//! ); +//! +//! // Create a rolling file writer +//! let rolling_file_writer_builder = RollingFileWriterBuilder::new( +//! parquet_writer_builder, +//! 512 * 1024 * 1024, //! table.file_io().clone(), //! location_generator.clone(), //! file_name_generator.clone(), //! ); -//! // Create a data file writer builder using parquet file writer builder. -//! let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); +//! +//! // Create a data file writer builder using rolling file writer. +//! let data_file_writer_builder = +//! DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)); //! // Create latency record writer using data file writer builder. //! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder); //! // Build the final writer diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 9a3d62137e..f4cba959ee 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -27,6 +27,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, CatalogBuilder, TableCreation}; use iceberg_catalog_rest::RestCatalogBuilder; @@ -74,12 +75,14 @@ async fn test_append_data_file() { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), table.metadata().current_schema().clone(), - None, + ); + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder, None); let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index c737357ee9..0da88f1a09 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -31,6 +31,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, CatalogBuilder, TableCreation}; use iceberg_catalog_rest::RestCatalogBuilder; @@ -96,18 +97,18 @@ async fn test_append_partition_data_file() { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), table.metadata().current_schema().clone(), - Some(partition_key), + ); + + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder.clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), ); let mut data_file_writer_valid = DataFileWriterBuilder::new( - parquet_writer_builder.clone(), - Some(Struct::from_iter([Some(Literal::Primitive( - PrimitiveLiteral::Int(first_partition_id_value), - ))])), - 0, + rolling_file_writer_builder.clone(), + Some(partition_key.clone()), ) .build() .await @@ -148,44 +149,53 @@ async fn test_append_partition_data_file() { assert_eq!(batches.len(), 1); assert_eq!(batches[0], batch); + let partition_key = partition_key.copy_with_data(Struct::from_iter([Some( + Literal::Primitive(PrimitiveLiteral::Boolean(true)), + )])); test_schema_incompatible_partition_type( - parquet_writer_builder.clone(), + rolling_file_writer_builder.clone(), batch.clone(), + partition_key.clone(), table.clone(), &rest_catalog, ) .await; + let partition_key = partition_key.copy_with_data(Struct::from_iter([ + Some(Literal::Primitive(PrimitiveLiteral::Int( + first_partition_id_value, + ))), + Some(Literal::Primitive(PrimitiveLiteral::Int( + first_partition_id_value, + ))), + ])); test_schema_incompatible_partition_fields( - parquet_writer_builder, + rolling_file_writer_builder.clone(), batch, + partition_key, table, &rest_catalog, - first_partition_id_value, ) .await; } async fn test_schema_incompatible_partition_type( - parquet_writer_builder: ParquetWriterBuilder< + rolling_file_writer_builder: RollingFileWriterBuilder< + ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator, >, batch: RecordBatch, + partition_key: PartitionKey, table: Table, catalog: &dyn Catalog, ) { // test writing different "type" of partition than mentioned in schema - let mut data_file_writer_invalid = DataFileWriterBuilder::new( - parquet_writer_builder.clone(), - Some(Struct::from_iter([Some(Literal::Primitive( - PrimitiveLiteral::Boolean(true), - ))])), - 0, - ) - .build() - .await - .unwrap(); + let mut data_file_writer_invalid = + DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)) + .build() + .await + .unwrap(); data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); @@ -200,32 +210,22 @@ async fn test_schema_incompatible_partition_type( } async fn test_schema_incompatible_partition_fields( - parquet_writer_builder: ParquetWriterBuilder< + rolling_file_writer_builder: RollingFileWriterBuilder< + ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator, >, batch: RecordBatch, + partition_key: PartitionKey, table: Table, catalog: &dyn Catalog, - first_partition_id_value: i32, ) { // test writing different number of partition fields than mentioned in schema - - let mut data_file_writer_invalid = DataFileWriterBuilder::new( - parquet_writer_builder, - Some(Struct::from_iter([ - Some(Literal::Primitive(PrimitiveLiteral::Int( - first_partition_id_value, - ))), - Some(Literal::Primitive(PrimitiveLiteral::Int( - first_partition_id_value, - ))), - ])), - 0, - ) - .build() - .await - .unwrap(); + let mut data_file_writer_invalid = + DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)) + .build() + .await + .unwrap(); data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 4b85612ff5..a248fa707a 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -27,6 +27,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, CatalogBuilder, TableCreation}; use iceberg_catalog_rest::RestCatalogBuilder; @@ -73,12 +74,14 @@ async fn test_append_data_file_conflict() { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), table.metadata().current_schema().clone(), - None, + ); + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder, None); let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index b8e7c5941e..1125de11a5 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -39,6 +39,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, CatalogBuilder, TableCreation}; use iceberg_catalog_rest::RestCatalogBuilder; @@ -155,12 +156,14 @@ async fn test_scan_all_type() { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), table.metadata().current_schema().clone(), - None, + ); + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder, None); let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); // Prepare data diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 625405c95b..712da92b21 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -208,7 +208,6 @@ impl ExecutionPlan for IcebergWriteExec { )); } - let spec_id = self.table.metadata().default_partition_spec_id(); let partition_type = self.table.metadata().default_partition_type().clone(); let format_version = self.table.metadata().format_version(); @@ -235,13 +234,7 @@ impl ExecutionPlan for IcebergWriteExec { let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( WriterProperties::default(), self.table.metadata().current_schema().clone(), - None, FieldMatchMode::Name, - self.table.file_io().clone(), - DefaultLocationGenerator::new(self.table.metadata().clone()) - .map_err(to_datafusion_error)?, - // todo filename prefix/suffix should be configurable - DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format), ); let target_file_size = match self .table @@ -261,10 +254,22 @@ impl ExecutionPlan for IcebergWriteExec { .map_err(to_datafusion_error)?, None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, }; - let rolling_writer_builder = - RollingFileWriterBuilder::new(parquet_file_writer_builder, target_file_size); - let data_file_writer_builder = - DataFileWriterBuilder::new(rolling_writer_builder, None, spec_id); + + let file_io = self.table.file_io().clone(); + let location_generator = DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?; + // todo filename prefix/suffix should be configurable + let file_name_generator = + DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format); + let rolling_writer_builder = RollingFileWriterBuilder::new( + parquet_file_writer_builder, + target_file_size, + file_io, + location_generator, + file_name_generator, + ); + // todo specify partition key when partitioning writer is supported + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None); // Get input data let data = execute_input_stream(