Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ use url::Url;
use super::storage::Storage;
use crate::{Error, ErrorKind, Result};

/// Configuration property for setting the chunk size for IO write operations.
///
/// This is useful for FileIO operations which may use multipart uploads (e.g. for S3)
/// where consistent chunk sizes of a certain size may be more optimal. Some services
/// like Cloudlare R2 requires all chunk sizes to be consistent except for the last.
pub const IO_CHUNK_SIZE: &str = "io.write.chunk-size";

/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
Expand Down Expand Up @@ -140,8 +147,27 @@ impl FileIO {
op,
path,
relative_path_pos,
chunk_size: self.get_write_chunk_size()?,
})
}

fn get_write_chunk_size(&self) -> Result<Option<usize>> {
match self.builder.props.get(IO_CHUNK_SIZE) {
Some(chunk_size) => {
let parsed_chunk_size = chunk_size.parse::<usize>().map_err(|_err| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid {}: Cannot parse to unsigned integer.",
IO_CHUNK_SIZE,
),
)
})?;
Ok(Some(parsed_chunk_size))
}
_ => Ok(None),
}
}
}

/// Builder for [`FileIO`].
Expand Down Expand Up @@ -322,6 +348,8 @@ pub struct OutputFile {
path: String,
// Relative path of file to uri, starts at [`relative_path_pos`]
relative_path_pos: usize,
// Chunk size for write operations to ensure consistent size of multipart chunks
chunk_size: Option<usize>,
}

impl OutputFile {
Expand Down Expand Up @@ -362,9 +390,11 @@ impl OutputFile {
///
/// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
let mut writer = self.op.writer_with(&self.path[self.relative_path_pos..]);
if let Some(chunk_size) = self.chunk_size {
writer = writer.chunk(chunk_size);
}
Ok(Box::new(writer.await?))
}
}

Expand All @@ -380,6 +410,7 @@ mod tests {
use tempfile::TempDir;

use super::{FileIO, FileIOBuilder};
use crate::io::IO_CHUNK_SIZE;

fn create_local_file_io() -> FileIO {
FileIOBuilder::new_fs_io().build().unwrap()
Expand Down Expand Up @@ -515,4 +546,16 @@ mod tests {
io.delete(&path).await.unwrap();
assert!(!io.exists(&path).await.unwrap());
}

#[tokio::test]
async fn test_set_chunk_size() {
let io = FileIOBuilder::new("memory")
.with_prop(IO_CHUNK_SIZE, 32 * 1024 * 1024)
.build()
.unwrap();

let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
let output_file = io.new_output(&path).unwrap();
assert_eq!(Some(32 * 1024 * 1024), output_file.chunk_size);
}
}
Loading