From f59cd619061cdffc7f8989327d4ffcc9e2122150 Mon Sep 17 00:00:00 2001 From: Alex Graham Date: Wed, 25 Jun 2025 14:04:08 -0500 Subject: [PATCH] Feature: Optionally configure consistent chunk sizes for multi-part uploads --- crates/iceberg/src/io/file_io.rs | 49 ++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index a71e1b0f07..06ee98b4b2 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -26,6 +26,13 @@ use url::Url; use super::storage::Storage; use crate::{Error, ErrorKind, Result}; +/// Configuration property for setting the chunk size for IO write operations. +/// +/// This is useful for FileIO operations which may use multipart uploads (e.g. for S3) +/// where consistent chunk sizes of a certain size may be more optimal. Some services +/// like Cloudlare R2 requires all chunk sizes to be consistent except for the last. +pub const IO_CHUNK_SIZE: &str = "io.write.chunk-size"; + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -140,8 +147,27 @@ impl FileIO { op, path, relative_path_pos, + chunk_size: self.get_write_chunk_size()?, }) } + + fn get_write_chunk_size(&self) -> Result> { + match self.builder.props.get(IO_CHUNK_SIZE) { + Some(chunk_size) => { + let parsed_chunk_size = chunk_size.parse::().map_err(|_err| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid {}: Cannot parse to unsigned integer.", + IO_CHUNK_SIZE, + ), + ) + })?; + Ok(Some(parsed_chunk_size)) + } + _ => Ok(None), + } + } } /// Builder for [`FileIO`]. @@ -322,6 +348,8 @@ pub struct OutputFile { path: String, // Relative path of file to uri, starts at [`relative_path_pos`] relative_path_pos: usize, + // Chunk size for write operations to ensure consistent size of multipart chunks + chunk_size: Option, } impl OutputFile { @@ -362,9 +390,11 @@ impl OutputFile { /// /// For one-time writing, use [`Self::write`] instead. pub async fn writer(&self) -> crate::Result> { - Ok(Box::new( - self.op.writer(&self.path[self.relative_path_pos..]).await?, - )) + let mut writer = self.op.writer_with(&self.path[self.relative_path_pos..]); + if let Some(chunk_size) = self.chunk_size { + writer = writer.chunk(chunk_size); + } + Ok(Box::new(writer.await?)) } } @@ -380,6 +410,7 @@ mod tests { use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; + use crate::io::IO_CHUNK_SIZE; fn create_local_file_io() -> FileIO { FileIOBuilder::new_fs_io().build().unwrap() @@ -515,4 +546,16 @@ mod tests { io.delete(&path).await.unwrap(); assert!(!io.exists(&path).await.unwrap()); } + + #[tokio::test] + async fn test_set_chunk_size() { + let io = FileIOBuilder::new("memory") + .with_prop(IO_CHUNK_SIZE, 32 * 1024 * 1024) + .build() + .unwrap(); + + let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap()); + let output_file = io.new_output(&path).unwrap(); + assert_eq!(Some(32 * 1024 * 1024), output_file.chunk_size); + } }