From da1c996395436230b353168f53a4446df6daa347 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 7 Feb 2025 14:54:32 +0530 Subject: [PATCH 1/2] fix: ignore corrupt parquet files --- src/staging/streams.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/staging/streams.rs b/src/staging/streams.rs index e950ed6dc..69133b210 100644 --- a/src/staging/streams.rs +++ b/src/staging/streams.rs @@ -34,7 +34,10 @@ use itertools::Itertools; use parquet::{ arrow::ArrowWriter, basic::Encoding, - file::properties::{WriterProperties, WriterPropertiesBuilder}, + file::{ + properties::{WriterProperties, WriterPropertiesBuilder}, + FOOTER_SIZE, + }, format::SortingColumn, schema::types::ColumnPath, }; @@ -218,7 +221,10 @@ impl<'a> Stream<'a> { dir.flatten() .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet"))) + .filter(|file| { + file.extension().is_some_and(|ext| ext.eq("parquet")) + && std::fs::metadata(file).is_ok_and(|meta| meta.len() > FOOTER_SIZE as u64) + }) .collect() } From 09bfbd89f6324518b1bb40bdc26fd481d19b46a5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 7 Feb 2025 15:26:51 +0530 Subject: [PATCH 2/2] fix: first write to `.part` file, then rename to `.parquet` only on success --- src/staging/streams.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/staging/streams.rs b/src/staging/streams.rs index 69133b210..86a533aa1 100644 --- a/src/staging/streams.rs +++ b/src/staging/streams.rs @@ -42,7 +42,7 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; -use tracing::error; +use tracing::{error, trace}; use crate::{ cli::Options, @@ -356,24 +356,32 @@ impl<'a> Stream<'a> { .build(); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); - let parquet_file = OpenOptions::new() + let mut part_path = parquet_path.to_owned(); + part_path.set_extension("part"); + let mut part_file = OpenOptions::new() .create(true) .append(true) - .open(&parquet_path) + .open(&part_path) .map_err(|_| StagingError::Create)?; - let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?; + let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?; for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { writer.write(record)?; } - writer.close()?; - if parquet_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { + + if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { error!( "Invalid parquet file {:?} detected for stream {}, removing it", - &parquet_path, &self.stream_name + &part_path, &self.stream_name ); - remove_file(parquet_path).unwrap(); + remove_file(part_path).unwrap(); } else { + trace!("Parquet file successfully constructed"); + if let Err(e) = std::fs::rename(&part_path, &parquet_path) { + error!( + "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" + ); + } for file in arrow_files { // warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len();