From 4403eb392b1638685986e536855ee041688cb0e3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sun, 27 Jul 2025 21:29:54 -0700 Subject: [PATCH 01/18] Fix current_written_size for ParquetWriter --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 75b3d9244a..a6c4297a79 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -611,7 +611,14 @@ impl CurrentFileStatus for ParquetWriter { } fn current_written_size(&self) -> usize { - self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize + if let Some(inner) = self.inner_writer.as_ref() { + // inner/AsyncArrowWriter contains sync and async writers + // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer + inner.bytes_written() + inner.in_progress_size() + } else { + // inner writer is not initialized yet + 0 + } } } From fe4b9be36c34cc983bdfaacc435a102743190d32 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sun, 27 Jul 2025 21:40:05 -0700 Subject: [PATCH 02/18] minor --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index a6c4297a79..5b39ee06cf 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -229,6 +229,8 @@ pub struct ParquetWriter { out_file: OutputFile, inner_writer: Option>>, writer_properties: WriterProperties, + // written_size is only accurate after closing the inner writer, + // because the inner writer flushes data asynchronously. written_size: Arc, current_row_num: usize, nan_value_count_visitor: NanValueCountVisitor, From f394c2269c3fac72b9e3363e63f528fdd25de1c5 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sun, 27 Jul 2025 21:57:56 -0700 Subject: [PATCH 03/18] Trigger Build From 14f8b67a7ef1722cede5d4bc51444f0bc1e30d8c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 18:20:54 -0700 Subject: [PATCH 04/18] rolling --- crates/iceberg/src/writer/base_writer/mod.rs | 1 + .../src/writer/base_writer/rolling_writer.rs | 112 ++++++++++++++++++ rust-toolchain.toml | 2 +- 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/writer/base_writer/rolling_writer.rs diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6d..f156fba2a9 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod rolling_writer; diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs new file mode 100644 index 0000000000..b8a8fd64d9 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::mem::take; + +use arrow_array::RecordBatch; +use async_trait::async_trait; + +use crate::spec::DataFile; +use crate::writer::base_writer::data_file_writer::DataFileWriter; +use crate::writer::file_writer::FileWriterBuilder; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +#[async_trait] +pub trait RollingFileWriter: IcebergWriter { + fn should_roll(&mut self, input_size: u64) -> bool; +} + +#[derive(Clone)] +pub struct RollingDataFileWriterBuilder { + inner_builder: B, + target_size: u64, +} + +impl RollingDataFileWriterBuilder { + pub fn new(inner_builder: B, target_size: u64) -> Self { + Self { + inner_builder, + target_size, + } + } +} + +#[async_trait] +impl IcebergWriterBuilder for RollingDataFileWriterBuilder { + type R = RollingDataFileWriter; + + async fn build(self) -> Result { + Ok(RollingDataFileWriter { + inner: None, + inner_builder: self.inner_builder, + target_size: self.target_size, + written_size: 0, + data_files: vec![], + }) + } +} + +pub struct RollingDataFileWriter { + inner: Option>, + inner_builder: B, + target_size: u64, + written_size: u64, + data_files: Vec, +} + +#[async_trait] +impl IcebergWriter for RollingDataFileWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let input_size = input.get_array_memory_size() as u64; + if self.should_roll(input_size) { + if let Some(mut inner) = self.inner.take() { + // close the current writer, roll to a new file + self.data_files.extend(inner.close().await?); + } + + // clear bytes written + self.written_size = 0; + } + + if self.inner.is_none() { + // start a new writer + self.inner = Some(self.inner_builder.clone().build().await?); + } + + // write the input and count bytes written + let Some(writer) = self.inner.as_mut() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )); + }; + writer.write(input).await?; + self.written_size += input_size; + Ok(()) + } + + async fn close(&mut self) -> Result> { + Ok(take(&mut self.data_files)) + } +} + +impl RollingFileWriter for RollingDataFileWriter { + fn should_roll(&mut self, input_size: u64) -> bool { + self.written_size + input_size > self.target_size + } +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index a9a807133e..6dab34d58a 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -20,5 +20,5 @@ # # The channel is exactly same day for our MSRV. [toolchain] -channel = "nightly-2025-02-20" +channel = "nightly-2025-05-20" components = ["rustfmt", "clippy"] From 67300dab7a24b5ecb7bb683a3ad98792b940a7a9 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 21:39:03 -0700 Subject: [PATCH 05/18] rolling in the deep --- crates/iceberg/src/writer/base_writer/mod.rs | 1 + .../src/writer/base_writer/rolling_writer.rs | 65 +++++++++++++++---- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index f156fba2a9..80770b75ea 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,4 +19,5 @@ pub mod data_file_writer; pub mod equality_delete_writer; +/// Module providing writers that can automatically roll over to new files based on size thresholds. pub mod rolling_writer; diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index b8a8fd64d9..31b40d5252 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -19,25 +19,46 @@ use std::mem::take; use arrow_array::RecordBatch; use async_trait::async_trait; +use futures::future::try_join_all; +use crate::runtime::{JoinHandle, spawn}; use crate::spec::DataFile; -use crate::writer::base_writer::data_file_writer::DataFileWriter; -use crate::writer::file_writer::FileWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; +/// A writer that can roll over to a new file when certain conditions are met. +/// +/// This trait extends `IcebergWriter` with the ability to determine when to start +/// writing to a new file based on the size of incoming data. #[async_trait] pub trait RollingFileWriter: IcebergWriter { + /// Determines if the writer should roll over to a new file. + /// + /// # Arguments + /// + /// * `input_size` - The size in bytes of the incoming data + /// + /// # Returns + /// + /// `true` if a new file should be started, `false` otherwise fn should_roll(&mut self, input_size: u64) -> bool; } +/// Builder for creating a `RollingDataFileWriter` that rolls over to a new file +/// when the data size exceeds a target threshold. #[derive(Clone)] -pub struct RollingDataFileWriterBuilder { +pub struct RollingDataFileWriterBuilder { inner_builder: B, target_size: u64, } -impl RollingDataFileWriterBuilder { +impl RollingDataFileWriterBuilder { + /// Creates a new `RollingDataFileWriterBuilder` with the specified inner builder and target size. + /// + /// # Arguments + /// + /// * `inner_builder` - The builder for the underlying file writer + /// * `target_size` - The target size in bytes before rolling over to a new file pub fn new(inner_builder: B, target_size: u64) -> Self { Self { inner_builder, @@ -47,7 +68,7 @@ impl RollingDataFileWriterBuilder { } #[async_trait] -impl IcebergWriterBuilder for RollingDataFileWriterBuilder { +impl IcebergWriterBuilder for RollingDataFileWriterBuilder { type R = RollingDataFileWriter; async fn build(self) -> Result { @@ -56,27 +77,34 @@ impl IcebergWriterBuilder for RollingDataFileWriterBuilder inner_builder: self.inner_builder, target_size: self.target_size, written_size: 0, - data_files: vec![], + close_handles: vec![], }) } } -pub struct RollingDataFileWriter { - inner: Option>, +/// A writer that automatically rolls over to a new file when the data size +/// exceeds a target threshold. +/// +/// This writer wraps another file writer and tracks the amount of data written. +/// When the data size exceeds the target size, it closes the current file and +/// starts writing to a new one. +pub struct RollingDataFileWriter { + inner: Option, inner_builder: B, target_size: u64, written_size: u64, - data_files: Vec, + close_handles: Vec>>>, } #[async_trait] -impl IcebergWriter for RollingDataFileWriter { +impl IcebergWriter for RollingDataFileWriter { async fn write(&mut self, input: RecordBatch) -> Result<()> { let input_size = input.get_array_memory_size() as u64; if self.should_roll(input_size) { if let Some(mut inner) = self.inner.take() { // close the current writer, roll to a new file - self.data_files.extend(inner.close().await?); + let handle = spawn(async move { inner.close().await }); + self.close_handles.push(handle) } // clear bytes written @@ -101,11 +129,22 @@ impl IcebergWriter for RollingDataFileWriter { } async fn close(&mut self) -> Result> { - Ok(take(&mut self.data_files)) + let mut data_files = try_join_all(take(&mut self.close_handles)) + .await? + .into_iter() + .flatten() + .collect::>(); + + // close the current writer and merge the output + if let Some(mut current_writer) = take(&mut self.inner) { + data_files.extend(current_writer.close().await?); + } + + Ok(data_files) } } -impl RollingFileWriter for RollingDataFileWriter { +impl RollingFileWriter for RollingDataFileWriter { fn should_roll(&mut self, input_size: u64) -> bool { self.written_size + input_size > self.target_size } From 3033cabd422aa86a2825b5c46b62ef3110b98926 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 21:48:46 -0700 Subject: [PATCH 06/18] rolls the unit tests --- .../src/writer/base_writer/rolling_writer.rs | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index 31b40d5252..cdb4a6531c 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -149,3 +149,192 @@ impl RollingFileWriter for RollingDataFileWriter { self.written_size + input_size > self.target_size } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::tests::check_parquet_data_file; + use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + + #[tokio::test] + async fn test_rolling_writer_basic() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Set a large target size so no rolling occurs + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 1024 * 1024, // 1MB, large enough to not trigger rolling + ); + + // Create writer + let mut writer = rolling_writer_builder.build().await?; + + // Create test data + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + // Write data + writer.write(batch.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify only one file was created + assert_eq!( + data_files.len(), + 1, + "Expected only one data file to be created" + ); + + // Verify file content + check_parquet_data_file(&file_io, &data_files[0], &batch).await; + + Ok(()) + } + + #[tokio::test] + async fn test_rolling_writer_with_rolling() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Set a very small target size to trigger rolling + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 100, // Very small target size to ensure rolling + ); + + // Create writer + let mut writer = rolling_writer_builder.build().await?; + + // Create test data + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + // Create multiple batches to trigger rolling + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(StringArray::from(vec!["Dave", "Eve", "Frank"])), + ])?; + + let batch3 = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(StringArray::from(vec!["Grace", "Heidi", "Ivan"])), + ])?; + + // Write data + writer.write(batch1.clone()).await?; + writer.write(batch2.clone()).await?; + writer.write(batch3.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify multiple files were created (at least 2) + assert!( + data_files.len() > 1, + "Expected multiple data files to be created, got {}", + data_files.len() + ); + + // Verify total record count across all files + let total_records: u64 = data_files.iter().map(|file| file.record_count).sum(); + assert_eq!( + total_records, 9, + "Expected 9 total records across all files" + ); + + // Verify each file has the correct content + // Note: We can't easily verify which records went to which file without more complex logic, + // but we can verify the total count and that each file has valid content + + Ok(()) + } +} From 02c1acce2e13c7aa58f43ae645f8cf7c156dd151 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 21:55:51 -0700 Subject: [PATCH 07/18] could have it all for tests --- .../src/writer/base_writer/rolling_writer.rs | 69 +++++++++---------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index cdb4a6531c..31d7226422 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -171,23 +171,40 @@ mod tests { use crate::writer::tests::check_parquet_data_file; use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + fn make_test_schema() -> Result { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + } + + fn make_test_arrow_schema() -> ArrowSchema { + ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]) + } + #[tokio::test] async fn test_rolling_writer_basic() -> Result<()> { - let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; let location_gen = MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // Create schema - let schema = Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; + let schema = make_test_schema()?; // Create writer builders let parquet_writer_builder = ParquetWriterBuilder::new( @@ -209,16 +226,7 @@ mod tests { let mut writer = rolling_writer_builder.build().await?; // Create test data - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 1.to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 2.to_string(), - )])), - ]); + let arrow_schema = make_test_arrow_schema(); let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), @@ -246,21 +254,15 @@ mod tests { #[tokio::test] async fn test_rolling_writer_with_rolling() -> Result<()> { - let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; let location_gen = MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // Create schema - let schema = Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; + let schema = make_test_schema()?; // Create writer builders let parquet_writer_builder = ParquetWriterBuilder::new( @@ -282,16 +284,7 @@ mod tests { let mut writer = rolling_writer_builder.build().await?; // Create test data - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 1.to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 2.to_string(), - )])), - ]); + let arrow_schema = make_test_arrow_schema(); // Create multiple batches to trigger rolling let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ From 50a903db5fd35b01d451224dfa2bdc32d28c3077 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 22 Jul 2025 17:34:54 -0700 Subject: [PATCH 08/18] keep rolling with the FileWriter --- .../src/writer/base_writer/rolling_writer.rs | 148 +++++++++--------- .../src/writer/file_writer/parquet_writer.rs | 4 +- crates/iceberg/src/writer/mod.rs | 4 +- rust-toolchain.toml | 2 +- 4 files changed, 78 insertions(+), 80 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index 31d7226422..e9de42ab57 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -15,51 +15,31 @@ // specific language governing permissions and limitations // under the License. -use std::mem::take; - use arrow_array::RecordBatch; -use async_trait::async_trait; use futures::future::try_join_all; use crate::runtime::{JoinHandle, spawn}; -use crate::spec::DataFile; -use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::spec::DataFileBuilder; +use crate::writer::CurrentFileStatus; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::{Error, ErrorKind, Result}; -/// A writer that can roll over to a new file when certain conditions are met. -/// -/// This trait extends `IcebergWriter` with the ability to determine when to start -/// writing to a new file based on the size of incoming data. -#[async_trait] -pub trait RollingFileWriter: IcebergWriter { - /// Determines if the writer should roll over to a new file. - /// - /// # Arguments - /// - /// * `input_size` - The size in bytes of the incoming data - /// - /// # Returns - /// - /// `true` if a new file should be started, `false` otherwise - fn should_roll(&mut self, input_size: u64) -> bool; -} - -/// Builder for creating a `RollingDataFileWriter` that rolls over to a new file +/// Builder for creating a `RollingFileWriter` that rolls over to a new file /// when the data size exceeds a target threshold. #[derive(Clone)] -pub struct RollingDataFileWriterBuilder { +pub struct RollingFileWriterBuilder { inner_builder: B, - target_size: u64, + target_size: usize, } -impl RollingDataFileWriterBuilder { - /// Creates a new `RollingDataFileWriterBuilder` with the specified inner builder and target size. +impl RollingFileWriterBuilder { + /// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size. /// /// # Arguments /// /// * `inner_builder` - The builder for the underlying file writer /// * `target_size` - The target size in bytes before rolling over to a new file - pub fn new(inner_builder: B, target_size: u64) -> Self { + pub fn new(inner_builder: B, target_size: usize) -> Self { Self { inner_builder, target_size, @@ -67,16 +47,14 @@ impl RollingDataFileWriterBuilder { } } -#[async_trait] -impl IcebergWriterBuilder for RollingDataFileWriterBuilder { - type R = RollingDataFileWriter; +impl FileWriterBuilder for RollingFileWriterBuilder { + type R = RollingFileWriter; async fn build(self) -> Result { - Ok(RollingDataFileWriter { + Ok(RollingFileWriter { inner: None, inner_builder: self.inner_builder, target_size: self.target_size, - written_size: 0, close_handles: vec![], }) } @@ -85,37 +63,51 @@ impl IcebergWriterBuilder for RollingDataFileWriterBuil /// A writer that automatically rolls over to a new file when the data size /// exceeds a target threshold. /// -/// This writer wraps another file writer and tracks the amount of data written. +/// This writer wraps another file writer that tracks the amount of data written. /// When the data size exceeds the target size, it closes the current file and /// starts writing to a new one. -pub struct RollingDataFileWriter { +pub struct RollingFileWriter { inner: Option, inner_builder: B, - target_size: u64, - written_size: u64, - close_handles: Vec>>>, + target_size: usize, + close_handles: Vec>>>, } -#[async_trait] -impl IcebergWriter for RollingDataFileWriter { - async fn write(&mut self, input: RecordBatch) -> Result<()> { - let input_size = input.get_array_memory_size() as u64; - if self.should_roll(input_size) { - if let Some(mut inner) = self.inner.take() { - // close the current writer, roll to a new file - let handle = spawn(async move { inner.close().await }); - self.close_handles.push(handle) - } +impl RollingFileWriter { + /// Determines if the writer should roll over to a new file. + /// + /// # Arguments + /// + /// * `input_size` - The size in bytes of the incoming data + /// + /// # Returns + /// + /// `true` if a new file should be started, `false` otherwise + pub fn should_roll(&self, input_size: usize) -> bool { + self.current_written_size() + input_size > self.target_size + } +} - // clear bytes written - self.written_size = 0; - } +impl FileWriter for RollingFileWriter { + async fn write(&mut self, input: &RecordBatch) -> Result<()> { + let input_size = input.get_array_memory_size(); if self.inner.is_none() { - // start a new writer + // initialize inner writer self.inner = Some(self.inner_builder.clone().build().await?); } + if self.should_roll(input_size) { + if let Some(inner) = self.inner.take() { + // close the current writer, roll to a new file + let handle = spawn(async move { inner.close().await }); + self.close_handles.push(handle); + + // start a new writer + self.inner = Some(self.inner_builder.clone().build().await?); + } + } + // write the input and count bytes written let Some(writer) = self.inner.as_mut() else { return Err(Error::new( @@ -124,29 +116,37 @@ impl IcebergWriter for RollingDataFileWriter { )); }; writer.write(input).await?; - self.written_size += input_size; + Ok(()) } - async fn close(&mut self) -> Result> { - let mut data_files = try_join_all(take(&mut self.close_handles)) + async fn close(self) -> Result> { + let mut data_file_builders = try_join_all(self.close_handles) .await? .into_iter() .flatten() - .collect::>(); + .collect::>(); // close the current writer and merge the output - if let Some(mut current_writer) = take(&mut self.inner) { - data_files.extend(current_writer.close().await?); + if let Some(current_writer) = self.inner { + data_file_builders.extend(current_writer.close().await?); } - Ok(data_files) + Ok(data_file_builders) } } -impl RollingFileWriter for RollingDataFileWriter { - fn should_roll(&mut self, input_size: u64) -> bool { - self.written_size + input_size > self.target_size +impl CurrentFileStatus for RollingFileWriter { + fn current_file_path(&self) -> String { + self.inner.as_ref().unwrap().current_file_path() + } + + fn current_row_num(&self) -> usize { + self.inner.as_ref().unwrap().current_row_num() + } + + fn current_written_size(&self) -> usize { + self.inner.as_ref().unwrap().current_written_size() } } @@ -214,16 +214,17 @@ mod tests { location_gen, file_name_gen, ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); // Set a large target size so no rolling occurs - let rolling_writer_builder = RollingDataFileWriterBuilder::new( - data_file_writer_builder, + let rolling_writer_builder = RollingFileWriterBuilder::new( + parquet_writer_builder, 1024 * 1024, // 1MB, large enough to not trigger rolling ); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + // Create writer - let mut writer = rolling_writer_builder.build().await?; + let mut writer = data_file_writer_builder.build().await?; // Create test data let arrow_schema = make_test_arrow_schema(); @@ -272,16 +273,17 @@ mod tests { location_gen, file_name_gen, ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); // Set a very small target size to trigger rolling - let rolling_writer_builder = RollingDataFileWriterBuilder::new( - data_file_writer_builder, + let rolling_writer_builder = RollingFileWriterBuilder::new( + parquet_writer_builder, 100, // Very small target size to ensure rolling ); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + // Create writer - let mut writer = rolling_writer_builder.build().await?; + let mut writer = data_file_writer_builder.build().await?; // Create test data let arrow_schema = make_test_arrow_schema(); @@ -324,10 +326,6 @@ mod tests { "Expected 9 total records across all files" ); - // Verify each file has the correct content - // Note: We can't easily verify which records went to which file without more complex logic, - // but we can verify the total count and that each file has valid content - Ok(()) } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 5b39ee06cf..c934edb81b 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -86,7 +86,7 @@ impl ParquetWriterBuilder { impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self) -> crate::Result { + async fn build(self) -> Result { let written_size = Arc::new(AtomicI64::new(0)); let out_file = self.file_io.new_output( self.location_generator @@ -517,7 +517,7 @@ impl ParquetWriter { } impl FileWriter for ParquetWriter { - async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> { + async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> { // Skip empty batch if batch.num_rows() == 0 { return Ok(()); diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index bb597b3749..c8af4c2c62 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -231,8 +231,8 @@ pub trait IcebergWriter: Send + 'static { async fn close(&mut self) -> Result; } -/// The current file status of iceberg writer. It implement for the writer which write a single -/// file. +/// The current file status of the Iceberg writer. +/// This is implemented for writers that write a single file at a time. pub trait CurrentFileStatus { /// Get the current file path. fn current_file_path(&self) -> String; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 6dab34d58a..a9a807133e 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -20,5 +20,5 @@ # # The channel is exactly same day for our MSRV. [toolchain] -channel = "nightly-2025-05-20" +channel = "nightly-2025-02-20" components = ["rustfmt", "clippy"] From 93790afa871a0714b139d86e6f15be73f11cfbc7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 22 Jul 2025 17:49:16 -0700 Subject: [PATCH 09/18] clean roll --- crates/iceberg/src/writer/base_writer/mod.rs | 2 -- .../src/writer/file_writer/location_generator.rs | 2 +- crates/iceberg/src/writer/file_writer/mod.rs | 2 ++ .../rolling_writer.rs | 4 ++-- crates/iceberg/src/writer/mod.rs | 15 ++++++++------- 5 files changed, 13 insertions(+), 12 deletions(-) rename crates/iceberg/src/writer/{base_writer => file_writer}/rolling_writer.rs (99%) diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 80770b75ea..37ab97eb6d 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,5 +19,3 @@ pub mod data_file_writer; pub mod equality_delete_writer; -/// Module providing writers that can automatically roll over to new files based on size thresholds. -pub mod rolling_writer; diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 0c0032c4f7..61312e33d8 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -26,7 +26,7 @@ use crate::spec::{DataFileFormat, TableMetadata}; /// `LocationGenerator` used to generate the location of data file. pub trait LocationGenerator: Clone + Send + 'static { /// Generate an absolute path for the given file name. - /// e.g + /// e.g. /// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet" fn generate_location(&self, file_name: &str) -> String; } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index dbf747ec12..d655a1cd06 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -29,6 +29,8 @@ pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; pub mod location_generator; +/// Module providing writers that can automatically roll over to new files based on size thresholds. +pub mod rolling_writer; type DefaultOutput = Vec; diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs similarity index 99% rename from crates/iceberg/src/writer/base_writer/rolling_writer.rs rename to crates/iceberg/src/writer/file_writer/rolling_writer.rs index e9de42ab57..4c71ae56f3 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -102,13 +102,13 @@ impl FileWriter for RollingFileWriter { // close the current writer, roll to a new file let handle = spawn(async move { inner.close().await }); self.close_handles.push(handle); - + // start a new writer self.inner = Some(self.inner_builder.clone().build().await?); } } - // write the input and count bytes written + // write the input let Some(writer) = self.inner.as_mut() else { return Err(Error::new( ErrorKind::Unexpected, diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index c8af4c2c62..2076d7707d 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -22,11 +22,11 @@ //! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file) //! or other function (Such as partition writer, delta writer). //! -//! The IcebergWriter will use FileWriter to write underly physical file. +//! The IcebergWriter will use the inner FileWriter to write physical files. //! -//! We hope the writer interface can be extensible and flexible. Each writer can be create config independently -//! and combined together to build a writer which have complex write logic. E.g. combine `FanoutPartitionWriter`, `DataFileWriter`, `ParquetWriter` to get -//! a writer can split the data automatelly according to partition and write down as parquet physical format. +//! The writer interface is designed to be extensible and flexible. Writers can be independently configured +//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`, +//! you can build a writer that automatically partitions the data and writes it in the Parquet format. //! //! For this purpose, there are four trait corresponding to these writer: //! - IcebergWriterBuilder @@ -34,8 +34,9 @@ //! - FileWriterBuilder //! - FileWriter //! -//! User can create specific writer builder, combine them and build the writer finally. Also user can custom -//! own writer and implement writer trait for them so that the custom writer can integrate with existing writer. (See following example) +//! Users can create specific writer builders, combine them, and build the final writer. +//! They can also define custom writers by implementing the `Writer` trait, +//! allowing seamless integration with existing writers. (See the example below.) //! //! # Simple example for the data file writer used parquet physical format: //! ```rust, no_run @@ -231,7 +232,7 @@ pub trait IcebergWriter: Send + 'static { async fn close(&mut self) -> Result; } -/// The current file status of the Iceberg writer. +/// The current file status of the Iceberg writer. /// This is implemented for writers that write a single file at a time. pub trait CurrentFileStatus { /// Get the current file path. From 732da142179e68e460686cf112c70b236cee679d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 24 Jul 2025 12:11:24 -0700 Subject: [PATCH 10/18] Update crates/iceberg/src/writer/file_writer/rolling_writer.rs Co-authored-by: Renjie Liu --- crates/iceberg/src/writer/file_writer/rolling_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 4c71ae56f3..133a3f7451 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -29,7 +29,7 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone)] pub struct RollingFileWriterBuilder { inner_builder: B, - target_size: usize, + target_file_size: usize, } impl RollingFileWriterBuilder { From 75d42fa9eb54e635760a5295b8eaa94271e4d386 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 24 Jul 2025 12:13:11 -0700 Subject: [PATCH 11/18] better naming --- .../iceberg/src/writer/file_writer/rolling_writer.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 133a3f7451..a30cf232a6 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -38,11 +38,11 @@ impl RollingFileWriterBuilder { /// # Arguments /// /// * `inner_builder` - The builder for the underlying file writer - /// * `target_size` - The target size in bytes before rolling over to a new file - pub fn new(inner_builder: B, target_size: usize) -> Self { + /// * `target_file_size` - The target size in bytes before rolling over to a new file + pub fn new(inner_builder: B, target_file_size: usize) -> Self { Self { inner_builder, - target_size, + target_file_size, } } } @@ -54,7 +54,7 @@ impl FileWriterBuilder for RollingFileWriterBuilder { Ok(RollingFileWriter { inner: None, inner_builder: self.inner_builder, - target_size: self.target_size, + target_file_size: self.target_file_size, close_handles: vec![], }) } @@ -69,7 +69,7 @@ impl FileWriterBuilder for RollingFileWriterBuilder { pub struct RollingFileWriter { inner: Option, inner_builder: B, - target_size: usize, + target_file_size: usize, close_handles: Vec>>>, } @@ -84,7 +84,7 @@ impl RollingFileWriter { /// /// `true` if a new file should be started, `false` otherwise pub fn should_roll(&self, input_size: usize) -> bool { - self.current_written_size() + input_size > self.target_size + self.current_written_size() + input_size > self.target_file_size } } From 822383061424943a42fea32e3182868a15cdce96 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 24 Jul 2025 12:21:35 -0700 Subject: [PATCH 12/18] explain the size --- crates/iceberg/src/writer/file_writer/rolling_writer.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index a30cf232a6..efeaa00d69 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -39,6 +39,9 @@ impl RollingFileWriterBuilder { /// /// * `inner_builder` - The builder for the underlying file writer /// * `target_file_size` - The target size in bytes before rolling over to a new file + /// + /// NOTE: The `target_file_size` does not exactly reflect the final size on physical storage. + /// This is because the input size is based on the Arrow in-memory format, which differs from the on-disk file format. pub fn new(inner_builder: B, target_file_size: usize) -> Self { Self { inner_builder, @@ -90,6 +93,8 @@ impl RollingFileWriter { impl FileWriter for RollingFileWriter { async fn write(&mut self, input: &RecordBatch) -> Result<()> { + // The input size is estimated using the Arrow in-memory format + // and will differ from the final on-disk file size. let input_size = input.get_array_memory_size(); if self.inner.is_none() { From 1c4293ac745798a75aee3089a35533f176ba15fe Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 24 Jul 2025 12:43:20 -0700 Subject: [PATCH 13/18] remove close_handles for now --- .../src/writer/file_writer/rolling_writer.rs | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index efeaa00d69..d5ce97cb6d 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -16,9 +16,7 @@ // under the License. use arrow_array::RecordBatch; -use futures::future::try_join_all; -use crate::runtime::{JoinHandle, spawn}; use crate::spec::DataFileBuilder; use crate::writer::CurrentFileStatus; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; @@ -58,7 +56,7 @@ impl FileWriterBuilder for RollingFileWriterBuilder { inner: None, inner_builder: self.inner_builder, target_file_size: self.target_file_size, - close_handles: vec![], + data_file_builders: vec![], }) } } @@ -73,7 +71,7 @@ pub struct RollingFileWriter { inner: Option, inner_builder: B, target_file_size: usize, - close_handles: Vec>>>, + data_file_builders: Vec, } impl RollingFileWriter { @@ -105,8 +103,7 @@ impl FileWriter for RollingFileWriter { if self.should_roll(input_size) { if let Some(inner) = self.inner.take() { // close the current writer, roll to a new file - let handle = spawn(async move { inner.close().await }); - self.close_handles.push(handle); + self.data_file_builders.extend(inner.close().await?); // start a new writer self.inner = Some(self.inner_builder.clone().build().await?); @@ -125,19 +122,14 @@ impl FileWriter for RollingFileWriter { Ok(()) } - async fn close(self) -> Result> { - let mut data_file_builders = try_join_all(self.close_handles) - .await? - .into_iter() - .flatten() - .collect::>(); - + async fn close(mut self) -> Result> { // close the current writer and merge the output if let Some(current_writer) = self.inner { - data_file_builders.extend(current_writer.close().await?); + self.data_file_builders + .extend(current_writer.close().await?); } - Ok(data_file_builders) + Ok(self.data_file_builders) } } From 5346e6976b67e038fe1f2071ea0410c7771d49d6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 24 Jul 2025 15:03:03 -0700 Subject: [PATCH 14/18] syntax fix --- crates/iceberg/src/writer/file_writer/rolling_writer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index d5ce97cb6d..6c620f9871 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -111,13 +111,14 @@ impl FileWriter for RollingFileWriter { } // write the input - let Some(writer) = self.inner.as_mut() else { + if let Some(writer) = self.inner.as_mut() { + writer.write(input).await?; + } else { return Err(Error::new( ErrorKind::Unexpected, "Writer is not initialized!", )); }; - writer.write(input).await?; Ok(()) } From 46a03ab73dcd63d6fa3acbb81726f7d538a7bc1e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 24 Jul 2025 15:05:12 -0700 Subject: [PATCH 15/18] more syntax fix --- .../iceberg/src/writer/file_writer/rolling_writer.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 6c620f9871..7406ccde05 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -112,15 +112,13 @@ impl FileWriter for RollingFileWriter { // write the input if let Some(writer) = self.inner.as_mut() { - writer.write(input).await?; + Ok(writer.write(input).await?) } else { - return Err(Error::new( + Err(Error::new( ErrorKind::Unexpected, "Writer is not initialized!", - )); - }; - - Ok(()) + )) + } } async fn close(mut self) -> Result> { From ac22f27afc5bcbc644ce4e63b6a04662adf8592c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 25 Jul 2025 11:46:23 -0700 Subject: [PATCH 16/18] conservatively rolling --- .../src/writer/file_writer/rolling_writer.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 7406ccde05..b7d6ce0ae7 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -39,7 +39,8 @@ impl RollingFileWriterBuilder { /// * `target_file_size` - The target size in bytes before rolling over to a new file /// /// NOTE: The `target_file_size` does not exactly reflect the final size on physical storage. - /// This is because the input size is based on the Arrow in-memory format, which differs from the on-disk file format. + /// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior. + /// The actual file size on disk is expected to be slightly larger than `target_file_size`. pub fn new(inner_builder: B, target_file_size: usize) -> Self { Self { inner_builder, @@ -77,30 +78,22 @@ pub struct RollingFileWriter { impl RollingFileWriter { /// Determines if the writer should roll over to a new file. /// - /// # Arguments - /// - /// * `input_size` - The size in bytes of the incoming data - /// /// # Returns /// /// `true` if a new file should be started, `false` otherwise - pub fn should_roll(&self, input_size: usize) -> bool { - self.current_written_size() + input_size > self.target_file_size + pub fn should_roll(&self) -> bool { + self.current_written_size() > self.target_file_size } } impl FileWriter for RollingFileWriter { async fn write(&mut self, input: &RecordBatch) -> Result<()> { - // The input size is estimated using the Arrow in-memory format - // and will differ from the final on-disk file size. - let input_size = input.get_array_memory_size(); - if self.inner.is_none() { // initialize inner writer self.inner = Some(self.inner_builder.clone().build().await?); } - if self.should_roll(input_size) { + if self.should_roll() { if let Some(inner) = self.inner.take() { // close the current writer, roll to a new file self.data_file_builders.extend(inner.close().await?); From 37bf054a25d4f369f1cd454c517d5c812f6431c6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 28 Jul 2025 15:47:12 -0700 Subject: [PATCH 17/18] should roll should be private --- crates/iceberg/src/writer/file_writer/rolling_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index b7d6ce0ae7..9346d39123 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -81,7 +81,7 @@ impl RollingFileWriter { /// # Returns /// /// `true` if a new file should be started, `false` otherwise - pub fn should_roll(&self) -> bool { + fn should_roll(&self) -> bool { self.current_written_size() > self.target_file_size } } From 5b84d8a8890548e1e9e1d3f88339101236ebdd4b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 28 Jul 2025 16:11:27 -0700 Subject: [PATCH 18/18] fix ut --- .../src/writer/file_writer/rolling_writer.rs | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 9346d39123..93fa975ce1 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -144,10 +144,11 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - use arrow_array::{Int32Array, StringArray}; + use arrow_array::{ArrayRef, Int32Array, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; + use rand::prelude::IteratorRandom; use tempfile::TempDir; use super::*; @@ -264,10 +265,7 @@ mod tests { ); // Set a very small target size to trigger rolling - let rolling_writer_builder = RollingFileWriterBuilder::new( - parquet_writer_builder, - 100, // Very small target size to ensure rolling - ); + let rolling_writer_builder = RollingFileWriterBuilder::new(parquet_writer_builder, 1024); let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); @@ -276,43 +274,50 @@ mod tests { // Create test data let arrow_schema = make_test_arrow_schema(); + let arrow_schema_ref = Arc::new(arrow_schema.clone()); - // Create multiple batches to trigger rolling - let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), - ])?; + let names = vec![ + "Alice", "Bob", "Charlie", "Dave", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy", + "Kelly", "Larry", "Mallory", "Shawn", + ]; - let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(StringArray::from(vec!["Dave", "Eve", "Frank"])), - ])?; + let mut rng = rand::thread_rng(); + let batch_num = 10; + let batch_rows = 100; + let expected_rows = batch_num * batch_rows; - let batch3 = RecordBatch::try_new(Arc::new(arrow_schema), vec![ - Arc::new(Int32Array::from(vec![7, 8, 9])), - Arc::new(StringArray::from(vec!["Grace", "Heidi", "Ivan"])), - ])?; + for i in 0..batch_num { + let int_values: Vec = (0..batch_rows).map(|row| i * batch_rows + row).collect(); + let str_values: Vec<&str> = (0..batch_rows) + .map(|_| *names.iter().choose(&mut rng).unwrap()) + .collect(); - // Write data - writer.write(batch1.clone()).await?; - writer.write(batch2.clone()).await?; - writer.write(batch3.clone()).await?; + let int_array = Arc::new(Int32Array::from(int_values)) as ArrayRef; + let str_array = Arc::new(StringArray::from(str_values)) as ArrayRef; + + let batch = + RecordBatch::try_new(Arc::clone(&arrow_schema_ref), vec![int_array, str_array]) + .expect("Failed to create RecordBatch"); + + writer.write(batch).await?; + } // Close writer and get data files let data_files = writer.close().await?; - // Verify multiple files were created (at least 2) + // Verify multiple files were created (at least 4) assert!( - data_files.len() > 1, - "Expected multiple data files to be created, got {}", + data_files.len() > 4, + "Expected at least 4 data files to be created, but got {}", data_files.len() ); // Verify total record count across all files let total_records: u64 = data_files.iter().map(|file| file.record_count).sum(); assert_eq!( - total_records, 9, - "Expected 9 total records across all files" + total_records, expected_rows as u64, + "Expected {} total records across all files", + expected_rows ); Ok(())