From c4b67e9b019dbcf8337a6541bdb1831db1f4d675 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 16:16:46 +0530 Subject: [PATCH 1/8] fix: ensure panic safety --- src/parseable/mod.rs | 3 +++ src/parseable/streams.rs | 29 ++++++++++++++++++----------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index eb4dd6761..b211ac944 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -58,6 +58,9 @@ use crate::{ mod staging; mod streams; +/// File extension for arrow files in staging +const ARROW_FILE_EXTENSION: &str = "data.arrows"; + /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity pub type LogStream = String; diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 34bfc4d5a..2304512cc 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -61,15 +61,13 @@ use super::{ writer::Writer, StagingError, }, - LogStream, + LogStream, ARROW_FILE_EXTENSION, }; #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); -const ARROW_FILE_EXTENSION: &str = "data.arrows"; - pub type StreamRef = Arc; /// All state associated with a single logstream in Parseable. @@ -486,10 +484,12 @@ impl Stream { } writer.close()?; - if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { + if part_file.metadata().expect("File was just created").len() + < parquet::file::FOOTER_SIZE as u64 + { error!( - "Invalid parquet file {:?} detected for stream {}, removing it", - &part_path, &self.stream_name + "Invalid parquet file {part_path:?} detected for stream {}, removing it", + &self.stream_name ); remove_file(part_path).unwrap(); } else { @@ -501,15 +501,22 @@ impl Stream { } for file in arrow_files { - // warn!("file-\n{file:?}\n"); - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - if remove_file(file.clone()).is_err() { + let file_size = match file.metadata() { + Ok(meta) => meta.len(), + Err(err) => { + warn!( + "Looks like the file ({}) was removed; Error = {err}", + file.display() + ); + continue; + } + }; + if remove_file(&file).is_err() { error!("Failed to delete file. Unstable state"); process::abort() } metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, file_type]) + .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) .sub(file_size as i64); } } From d3df5b05c324b9188e9225f90c6224c6c026fbac Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 16:22:15 +0530 Subject: [PATCH 2/8] refactor: it's just `arrows` --- src/parseable/mod.rs | 2 +- src/parseable/streams.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index b211ac944..60ec06b55 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -59,7 +59,7 @@ mod staging; mod streams; /// File extension for arrow files in staging -const ARROW_FILE_EXTENSION: &str = "data.arrows"; +const ARROW_FILE_EXTENSION: &str = "arrows"; /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 2304512cc..37be70672 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -154,7 +154,7 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.date(), parsed_timestamp.hour(), @@ -881,7 +881,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.date(), parsed_timestamp.hour(), @@ -916,7 +916,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.date(), parsed_timestamp.hour(), From 0e41597663bb6375fca69b7e79116be872838f7b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 17:30:27 +0530 Subject: [PATCH 3/8] style: messaging --- src/parseable/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 37be70672..279f4bd7b 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -505,7 +505,7 @@ impl Stream { Ok(meta) => meta.len(), Err(err) => { warn!( - "Looks like the file ({}) was removed; Error = {err}", + "File ({}) not found; Error = {err}", file.display() ); continue; From ff9cddfa719cec58b8c8e621e613048d2b13639b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 21:41:06 +0530 Subject: [PATCH 4/8] don't panic on missing file --- src/parseable/staging/reader.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 6df0dc324..429ecf18d 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; -use tracing::error; +use tracing::{error, warn}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, @@ -82,11 +82,16 @@ pub struct MergedReverseRecordReader { } impl MergedReverseRecordReader { - pub fn try_new(files: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { - error!("Invalid file detected, ignoring it: {:?}", file); + pub fn try_new(file_paths: &[PathBuf]) -> Self { + let mut readers = Vec::with_capacity(file_paths.len()); + for path in file_paths { + let Ok(file) = File::open(&path) else { + warn!("Error when trying to read file: {path:?}"); + continue; + }; + + let Ok(reader) = get_reverse_reader(file) else { + error!("Invalid file detected, ignoring it: {path:?}"); continue; }; From 3122c01d0696f84d6d432e944215ccda480b5e67 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 21:50:31 +0530 Subject: [PATCH 5/8] ci+refactor: print error --- src/parseable/staging/reader.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 429ecf18d..4ee011729 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -85,14 +85,17 @@ impl MergedReverseRecordReader { pub fn try_new(file_paths: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(file_paths.len()); for path in file_paths { - let Ok(file) = File::open(&path) else { + let Ok(file) = File::open(path) else { warn!("Error when trying to read file: {path:?}"); continue; }; - let Ok(reader) = get_reverse_reader(file) else { - error!("Invalid file detected, ignoring it: {path:?}"); - continue; + let reader = match get_reverse_reader(file) { + Ok(r) => r, + Err(err) => { + error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); + continue; + } }; readers.push(reader); From faacd19550d064b54a39579ca85425d2da526551 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 26 Feb 2025 15:51:56 +0530 Subject: [PATCH 6/8] don't panic on failure to read --- src/storage/object_storage.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 0b5449c0b..0478918cf 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -461,16 +461,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result, ObjectStorageError> { let path = manifest_path(path.as_str()); match self.get_object(&path).await { - Ok(bytes) => Ok(Some( - serde_json::from_slice(&bytes).expect("manifest is valid json"), - )), - Err(err) => { - if matches!(err, ObjectStorageError::NoSuchKey(_)) { - Ok(None) - } else { - Err(err) - } + Ok(bytes) => { + let manifest = serde_json::from_slice(&bytes)?; + Ok(Some(manifest)) } + Err(ObjectStorageError::NoSuchKey(_)) => Ok(None), + Err(err) => Err(err), } } From aad96b0638abe93aac1176992d194243401b4e2f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 26 Feb 2025 21:59:43 +0530 Subject: [PATCH 7/8] fix: early EOF --- src/parseable/staging/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 4ee011729..74fe339fb 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -263,7 +263,7 @@ pub fn get_reverse_reader( messages.push((header, offset, size)); offset += size; } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && messages.len() > 0 => break, Err(err) => return Err(err), } } From de2ebff53e3a1dca9d289ad1e6cafa3190e692e9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 27 Feb 2025 10:55:43 +0530 Subject: [PATCH 8/8] ci: clippy suggestion Signed-off-by: Devdutt Shenoi --- src/parseable/staging/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 74fe339fb..b9dae11e6 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -263,7 +263,7 @@ pub fn get_reverse_reader( messages.push((header, offset, size)); offset += size; } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && messages.len() > 0 => break, + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && !messages.is_empty() => break, Err(err) => return Err(err), } }