Skip to content

Commit 0a4844e

Browse files
committed
support delete if empty for parquet writer
1 parent f33628e commit 0a4844e

File tree

7 files changed

+26
-0
lines changed

7 files changed

+26
-0
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,11 @@ impl OutputFile {
334334
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
335335
}
336336

337+
/// Delete the file if it exists.
338+
pub async fn delete(&self) -> crate::Result<()> {
339+
Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
340+
}
341+
337342
/// Converts into [`InputFile`].
338343
pub fn to_input_file(self) -> InputFile {
339344
InputFile {

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ mod test {
140140
file_io.clone(),
141141
location_gen,
142142
file_name_gen,
143+
false,
143144
);
144145

145146
let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await.unwrap();
@@ -198,6 +199,7 @@ mod test {
198199
file_io.clone(),
199200
location_gen,
200201
file_name_gen,
202+
false,
201203
);
202204

203205
let mut data_file_writer =

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ mod test {
397397
file_io.clone(),
398398
location_gen,
399399
file_name_gen,
400+
false,
400401
);
401402
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config)
402403
.build()
@@ -561,6 +562,7 @@ mod test {
561562
file_io.clone(),
562563
location_gen,
563564
file_name_gen,
565+
false,
564566
);
565567
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config)
566568
.build()

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
5454
file_io: FileIO,
5555
location_generator: T,
5656
file_name_generator: F,
57+
58+
delete_if_empty: bool,
5759
}
5860

5961
impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
@@ -65,13 +67,15 @@ impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
6567
file_io: FileIO,
6668
location_generator: T,
6769
file_name_generator: F,
70+
delete_if_empty: bool,
6871
) -> Self {
6972
Self {
7073
props,
7174
schema,
7275
file_io,
7376
location_generator,
7477
file_name_generator,
78+
delete_if_empty,
7579
}
7680
}
7781
}
@@ -101,6 +105,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
101105
written_size,
102106
current_row_num: 0,
103107
out_file,
108+
delete_if_empty: self.delete_if_empty,
104109
})
105110
}
106111
}
@@ -216,6 +221,7 @@ pub struct ParquetWriter {
216221
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
217222
written_size: Arc<AtomicI64>,
218223
current_row_num: usize,
224+
delete_if_empty: bool,
219225
}
220226

221227
/// Used to aggregate min and max value of each column.
@@ -411,6 +417,11 @@ impl FileWriter for ParquetWriter {
411417
Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err)
412418
})?;
413419

420+
if self.delete_if_empty && self.current_row_num == 0 {
421+
self.out_file.delete().await?;
422+
return Ok(vec![]);
423+
}
424+
414425
let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed);
415426

416427
Ok(vec![Self::to_data_file_builder(
@@ -671,6 +682,7 @@ mod tests {
671682
file_io.clone(),
672683
location_gen,
673684
file_name_gen,
685+
false,
674686
)
675687
.build()
676688
.await?;
@@ -867,6 +879,7 @@ mod tests {
867879
file_io.clone(),
868880
location_gen,
869881
file_name_gen,
882+
false,
870883
)
871884
.build()
872885
.await?;
@@ -1057,6 +1070,7 @@ mod tests {
10571070
file_io.clone(),
10581071
loccation_gen,
10591072
file_name_gen,
1073+
false,
10601074
)
10611075
.build()
10621076
.await?;

crates/integration_tests/tests/append_data_file_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async fn test_append_data_file() {
9696
table.file_io().clone(),
9797
location_generator.clone(),
9898
file_name_generator.clone(),
99+
false,
99100
);
100101
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
101102
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();

crates/integration_tests/tests/append_partition_data_file_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ async fn test_append_partition_data_file() {
113113
table.file_io().clone(),
114114
location_generator.clone(),
115115
file_name_generator.clone(),
116+
false,
116117
);
117118

118119
let mut data_file_writer_valid = DataFileWriterBuilder::new(

crates/integration_tests/tests/conflict_commit_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ async fn test_append_data_file_conflict() {
9595
table.file_io().clone(),
9696
location_generator.clone(),
9797
file_name_generator.clone(),
98+
false,
9899
);
99100
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
100101
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();

0 commit comments

Comments
 (0)