diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 0c0032c4f7..61312e33d8 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -26,7 +26,7 @@ use crate::spec::{DataFileFormat, TableMetadata}; /// `LocationGenerator` used to generate the location of data file. pub trait LocationGenerator: Clone + Send + 'static { /// Generate an absolute path for the given file name. - /// e.g + /// e.g. /// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet" fn generate_location(&self, file_name: &str) -> String; } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index dbf747ec12..d655a1cd06 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -29,6 +29,8 @@ pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; pub mod location_generator; +/// Module providing writers that can automatically roll over to new files based on size thresholds. +pub mod rolling_writer; type DefaultOutput = Vec; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index ef791b35f0..c8e9a2315f 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -86,7 +86,7 @@ impl ParquetWriterBuilder { impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self) -> crate::Result { + async fn build(self) -> Result { let written_size = Arc::new(AtomicI64::new(0)); let out_file = self.file_io.new_output( self.location_generator @@ -517,7 +517,7 @@ impl ParquetWriter { } impl FileWriter for ParquetWriter { - async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> { + async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> { // Skip empty batch if batch.num_rows() == 0 { return Ok(()); diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs new file mode 100644 index 0000000000..93fa975ce1 --- /dev/null +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; + +use crate::spec::DataFileBuilder; +use crate::writer::CurrentFileStatus; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for creating a `RollingFileWriter` that rolls over to a new file +/// when the data size exceeds a target threshold. +#[derive(Clone)] +pub struct RollingFileWriterBuilder { + inner_builder: B, + target_file_size: usize, +} + +impl RollingFileWriterBuilder { + /// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size. + /// + /// # Arguments + /// + /// * `inner_builder` - The builder for the underlying file writer + /// * `target_file_size` - The target size in bytes before rolling over to a new file + /// + /// NOTE: The `target_file_size` does not exactly reflect the final size on physical storage. + /// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior. + /// The actual file size on disk is expected to be slightly larger than `target_file_size`. + pub fn new(inner_builder: B, target_file_size: usize) -> Self { + Self { + inner_builder, + target_file_size, + } + } +} + +impl FileWriterBuilder for RollingFileWriterBuilder { + type R = RollingFileWriter; + + async fn build(self) -> Result { + Ok(RollingFileWriter { + inner: None, + inner_builder: self.inner_builder, + target_file_size: self.target_file_size, + data_file_builders: vec![], + }) + } +} + +/// A writer that automatically rolls over to a new file when the data size +/// exceeds a target threshold. +/// +/// This writer wraps another file writer that tracks the amount of data written. +/// When the data size exceeds the target size, it closes the current file and +/// starts writing to a new one. +pub struct RollingFileWriter { + inner: Option, + inner_builder: B, + target_file_size: usize, + data_file_builders: Vec, +} + +impl RollingFileWriter { + /// Determines if the writer should roll over to a new file. + /// + /// # Returns + /// + /// `true` if a new file should be started, `false` otherwise + fn should_roll(&self) -> bool { + self.current_written_size() > self.target_file_size + } +} + +impl FileWriter for RollingFileWriter { + async fn write(&mut self, input: &RecordBatch) -> Result<()> { + if self.inner.is_none() { + // initialize inner writer + self.inner = Some(self.inner_builder.clone().build().await?); + } + + if self.should_roll() { + if let Some(inner) = self.inner.take() { + // close the current writer, roll to a new file + self.data_file_builders.extend(inner.close().await?); + + // start a new writer + self.inner = Some(self.inner_builder.clone().build().await?); + } + } + + // write the input + if let Some(writer) = self.inner.as_mut() { + Ok(writer.write(input).await?) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )) + } + } + + async fn close(mut self) -> Result> { + // close the current writer and merge the output + if let Some(current_writer) = self.inner { + self.data_file_builders + .extend(current_writer.close().await?); + } + + Ok(self.data_file_builders) + } +} + +impl CurrentFileStatus for RollingFileWriter { + fn current_file_path(&self) -> String { + self.inner.as_ref().unwrap().current_file_path() + } + + fn current_row_num(&self) -> usize { + self.inner.as_ref().unwrap().current_row_num() + } + + fn current_written_size(&self) -> usize { + self.inner.as_ref().unwrap().current_written_size() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use rand::prelude::IteratorRandom; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::tests::check_parquet_data_file; + use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + + fn make_test_schema() -> Result { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + } + + fn make_test_arrow_schema() -> ArrowSchema { + ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]) + } + + #[tokio::test] + async fn test_rolling_writer_basic() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = make_test_schema()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Set a large target size so no rolling occurs + let rolling_writer_builder = RollingFileWriterBuilder::new( + parquet_writer_builder, + 1024 * 1024, // 1MB, large enough to not trigger rolling + ); + + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + + // Create writer + let mut writer = data_file_writer_builder.build().await?; + + // Create test data + let arrow_schema = make_test_arrow_schema(); + + let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + // Write data + writer.write(batch.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify only one file was created + assert_eq!( + data_files.len(), + 1, + "Expected only one data file to be created" + ); + + // Verify file content + check_parquet_data_file(&file_io, &data_files[0], &batch).await; + + Ok(()) + } + + #[tokio::test] + async fn test_rolling_writer_with_rolling() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = make_test_schema()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Set a very small target size to trigger rolling + let rolling_writer_builder = RollingFileWriterBuilder::new(parquet_writer_builder, 1024); + + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + + // Create writer + let mut writer = data_file_writer_builder.build().await?; + + // Create test data + let arrow_schema = make_test_arrow_schema(); + let arrow_schema_ref = Arc::new(arrow_schema.clone()); + + let names = vec![ + "Alice", "Bob", "Charlie", "Dave", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy", + "Kelly", "Larry", "Mallory", "Shawn", + ]; + + let mut rng = rand::thread_rng(); + let batch_num = 10; + let batch_rows = 100; + let expected_rows = batch_num * batch_rows; + + for i in 0..batch_num { + let int_values: Vec = (0..batch_rows).map(|row| i * batch_rows + row).collect(); + let str_values: Vec<&str> = (0..batch_rows) + .map(|_| *names.iter().choose(&mut rng).unwrap()) + .collect(); + + let int_array = Arc::new(Int32Array::from(int_values)) as ArrayRef; + let str_array = Arc::new(StringArray::from(str_values)) as ArrayRef; + + let batch = + RecordBatch::try_new(Arc::clone(&arrow_schema_ref), vec![int_array, str_array]) + .expect("Failed to create RecordBatch"); + + writer.write(batch).await?; + } + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify multiple files were created (at least 4) + assert!( + data_files.len() > 4, + "Expected at least 4 data files to be created, but got {}", + data_files.len() + ); + + // Verify total record count across all files + let total_records: u64 = data_files.iter().map(|file| file.record_count).sum(); + assert_eq!( + total_records, expected_rows as u64, + "Expected {} total records across all files", + expected_rows + ); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index bb597b3749..2076d7707d 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -22,11 +22,11 @@ //! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file) //! or other function (Such as partition writer, delta writer). //! -//! The IcebergWriter will use FileWriter to write underly physical file. +//! The IcebergWriter will use the inner FileWriter to write physical files. //! -//! We hope the writer interface can be extensible and flexible. Each writer can be create config independently -//! and combined together to build a writer which have complex write logic. E.g. combine `FanoutPartitionWriter`, `DataFileWriter`, `ParquetWriter` to get -//! a writer can split the data automatelly according to partition and write down as parquet physical format. +//! The writer interface is designed to be extensible and flexible. Writers can be independently configured +//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`, +//! you can build a writer that automatically partitions the data and writes it in the Parquet format. //! //! For this purpose, there are four trait corresponding to these writer: //! - IcebergWriterBuilder @@ -34,8 +34,9 @@ //! - FileWriterBuilder //! - FileWriter //! -//! User can create specific writer builder, combine them and build the writer finally. Also user can custom -//! own writer and implement writer trait for them so that the custom writer can integrate with existing writer. (See following example) +//! Users can create specific writer builders, combine them, and build the final writer. +//! They can also define custom writers by implementing the `Writer` trait, +//! allowing seamless integration with existing writers. (See the example below.) //! //! # Simple example for the data file writer used parquet physical format: //! ```rust, no_run @@ -231,8 +232,8 @@ pub trait IcebergWriter: Send + 'static { async fn close(&mut self) -> Result; } -/// The current file status of iceberg writer. It implement for the writer which write a single -/// file. +/// The current file status of the Iceberg writer. +/// This is implemented for writers that write a single file at a time. pub trait CurrentFileStatus { /// Get the current file path. fn current_file_path(&self) -> String;