From b928c614323f5adfe7315ef986b636a6af9e1e18 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 3 Mar 2025 17:37:42 +0800 Subject: [PATCH 1/2] support lazy init in parquet writer to avoid create file when empty write --- .../src/writer/file_writer/parquet_writer.rs | 102 +++++++++++++++--- 1 file changed, 89 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 97fd6e6c46..bed9cc3ddc 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -84,24 +84,16 @@ impl FileWriterBuilder for ParquetWr type R = ParquetWriter; async fn build(self) -> crate::Result { - let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); let written_size = Arc::new(AtomicI64::new(0)); let out_file = self.file_io.new_output( self.location_generator .generate_location(&self.file_name_generator.generate_file_name()), )?; - let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); - let async_writer = AsyncFileWriter::new(inner_writer); - let writer = - AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), Some(self.props)) - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") - .with_source(err) - })?; Ok(ParquetWriter { schema: self.schema.clone(), - writer, + inner_writer: None, + writer_properties: self.props, written_size, current_row_num: 0, out_file, @@ -226,7 +218,8 @@ impl SchemaVisitor for IndexByParquetPathName { pub struct ParquetWriter { schema: SchemaRef, out_file: OutputFile, - writer: AsyncArrowWriter>, + inner_writer: Option>>, + writer_properties: WriterProperties, written_size: Arc, current_row_num: usize, } @@ -520,8 +513,35 @@ impl ParquetWriter { impl FileWriter for ParquetWriter { async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> { + // Skip empty batch + if batch.num_rows() == 0 { + return Ok(()); + } + self.current_row_num += batch.num_rows(); - self.writer.write(batch).await.map_err(|err| { + + // Lazy initialize the writer + let writer = if let Some(writer) = &mut self.inner_writer { + writer + } else { + let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); + let inner_writer = + TrackWriter::new(self.out_file.writer().await?, self.written_size.clone()); + let async_writer = AsyncFileWriter::new(inner_writer); + let writer = AsyncArrowWriter::try_new( + async_writer, + arrow_schema.clone(), + Some(self.writer_properties.clone()), + ) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") + .with_source(err) + })?; + self.inner_writer = Some(writer); + self.inner_writer.as_mut().unwrap() + }; + + writer.write(batch).await.map_err(|err| { Error::new( ErrorKind::Unexpected, "Failed to write using parquet writer.", @@ -532,7 +552,10 @@ impl FileWriter for ParquetWriter { } async fn close(self) -> crate::Result> { - let metadata = self.writer.close().await.map_err(|err| { + let Some(writer) = self.inner_writer else { + return Ok(vec![]); + }; + let metadata = writer.close().await.map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err) })?; @@ -1538,4 +1561,57 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_empty_write() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Test that file will create if data to write + let schema = { + let fields = vec![ + arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef; + let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(to_write.schema().as_ref().try_into().unwrap()), + file_io.clone(), + location_gen.clone(), + file_name_gen, + ) + .build() + .await?; + pw.write(&to_write).await?; + let file_path = pw.out_file.location().to_string(); + pw.close().await.unwrap(); + assert!(file_io.exists(file_path).await.unwrap()); + + // Test that file will not create if no data to write + let file_name_gen = + DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(to_write.schema().as_ref().try_into().unwrap()), + file_io.clone(), + location_gen, + file_name_gen, + ) + .build() + .await?; + let file_path = pw.out_file.location().to_string(); + pw.close().await.unwrap(); + assert!(!file_io.exists(file_path).await.unwrap()); + + Ok(()) + } } From 68327ac34804b507c9156d5ea8b58d94a505b0b9 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 5 Mar 2025 16:01:52 +0800 Subject: [PATCH 2/2] fix test --- .../iceberg/src/writer/base_writer/data_file_writer.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 940aa15842..e193b4a94c 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -144,6 +144,16 @@ mod test { let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await.unwrap(); + let arrow_schema = arrow_schema::Schema::new(vec![ + Field::new("foo", DataType::Int32, false), + Field::new("bar", DataType::Utf8, false), + ]); + let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + data_file_writer.write(batch).await?; + let data_files = data_file_writer.close().await.unwrap(); assert_eq!(data_files.len(), 1);