Skip to content

Commit b928c61

Browse files
committed
support lazy init in parquet writer to avoid create file when empty
write
1 parent edf3961 commit b928c61

File tree

1 file changed

+89
-13
lines changed

1 file changed

+89
-13
lines changed

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,16 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8484
type R = ParquetWriter;
8585

8686
async fn build(self) -> crate::Result<Self::R> {
87-
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
8887
let written_size = Arc::new(AtomicI64::new(0));
8988
let out_file = self.file_io.new_output(
9089
self.location_generator
9190
.generate_location(&self.file_name_generator.generate_file_name()),
9291
)?;
93-
let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone());
94-
let async_writer = AsyncFileWriter::new(inner_writer);
95-
let writer =
96-
AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), Some(self.props))
97-
.map_err(|err| {
98-
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
99-
.with_source(err)
100-
})?;
10192

10293
Ok(ParquetWriter {
10394
schema: self.schema.clone(),
104-
writer,
95+
inner_writer: None,
96+
writer_properties: self.props,
10597
written_size,
10698
current_row_num: 0,
10799
out_file,
@@ -226,7 +218,8 @@ impl SchemaVisitor for IndexByParquetPathName {
226218
pub struct ParquetWriter {
227219
schema: SchemaRef,
228220
out_file: OutputFile,
229-
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
221+
inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<TrackWriter>>>,
222+
writer_properties: WriterProperties,
230223
written_size: Arc<AtomicI64>,
231224
current_row_num: usize,
232225
}
@@ -520,8 +513,35 @@ impl ParquetWriter {
520513

521514
impl FileWriter for ParquetWriter {
522515
async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> {
516+
// Skip empty batch
517+
if batch.num_rows() == 0 {
518+
return Ok(());
519+
}
520+
523521
self.current_row_num += batch.num_rows();
524-
self.writer.write(batch).await.map_err(|err| {
522+
523+
// Lazy initialize the writer
524+
let writer = if let Some(writer) = &mut self.inner_writer {
525+
writer
526+
} else {
527+
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
528+
let inner_writer =
529+
TrackWriter::new(self.out_file.writer().await?, self.written_size.clone());
530+
let async_writer = AsyncFileWriter::new(inner_writer);
531+
let writer = AsyncArrowWriter::try_new(
532+
async_writer,
533+
arrow_schema.clone(),
534+
Some(self.writer_properties.clone()),
535+
)
536+
.map_err(|err| {
537+
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
538+
.with_source(err)
539+
})?;
540+
self.inner_writer = Some(writer);
541+
self.inner_writer.as_mut().unwrap()
542+
};
543+
544+
writer.write(batch).await.map_err(|err| {
525545
Error::new(
526546
ErrorKind::Unexpected,
527547
"Failed to write using parquet writer.",
@@ -532,7 +552,10 @@ impl FileWriter for ParquetWriter {
532552
}
533553

534554
async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
535-
let metadata = self.writer.close().await.map_err(|err| {
555+
let Some(writer) = self.inner_writer else {
556+
return Ok(vec![]);
557+
};
558+
let metadata = writer.close().await.map_err(|err| {
536559
Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err)
537560
})?;
538561

@@ -1538,4 +1561,57 @@ mod tests {
15381561

15391562
Ok(())
15401563
}
1564+
1565+
#[tokio::test]
1566+
async fn test_empty_write() -> Result<()> {
1567+
let temp_dir = TempDir::new().unwrap();
1568+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1569+
let location_gen =
1570+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
1571+
let file_name_gen =
1572+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1573+
1574+
// Test that file will create if data to write
1575+
let schema = {
1576+
let fields = vec![
1577+
arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1578+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1579+
),
1580+
];
1581+
Arc::new(arrow_schema::Schema::new(fields))
1582+
};
1583+
let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1584+
let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1585+
let mut pw = ParquetWriterBuilder::new(
1586+
WriterProperties::builder().build(),
1587+
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1588+
file_io.clone(),
1589+
location_gen.clone(),
1590+
file_name_gen,
1591+
)
1592+
.build()
1593+
.await?;
1594+
pw.write(&to_write).await?;
1595+
let file_path = pw.out_file.location().to_string();
1596+
pw.close().await.unwrap();
1597+
assert!(file_io.exists(file_path).await.unwrap());
1598+
1599+
// Test that file will not create if no data to write
1600+
let file_name_gen =
1601+
DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1602+
let pw = ParquetWriterBuilder::new(
1603+
WriterProperties::builder().build(),
1604+
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1605+
file_io.clone(),
1606+
location_gen,
1607+
file_name_gen,
1608+
)
1609+
.build()
1610+
.await?;
1611+
let file_path = pw.out_file.location().to_string();
1612+
pw.close().await.unwrap();
1613+
assert!(!file_io.exists(file_path).await.unwrap());
1614+
1615+
Ok(())
1616+
}
15411617
}

0 commit comments

Comments
 (0)