diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..8a56707b5 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -444,21 +444,27 @@ impl Stream { .set(0); } + //find sum of arrow files in staging directory for a stream + let total_arrow_files = staging_files.values().map(|v| v.len()).sum::(); + metrics::STAGING_FILES + .with_label_values(&[&self.stream_name]) + .set(total_arrow_files as i64); + + //find sum of file sizes of all arrow files in staging_files + let total_arrow_files_size = staging_files + .values() + .map(|v| { + v.iter() + .map(|file| file.metadata().unwrap().len()) + .sum::() + }) + .sum::(); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "arrows"]) + .set(total_arrow_files_size as i64); + // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { - metrics::STAGING_FILES - .with_label_values(&[&self.stream_name]) - .set(arrow_files.len() as i64); - - for file in &arrow_files { - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, file_type]) - .add(file_size as i64); - } - let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { continue; @@ -494,6 +500,7 @@ impl Stream { "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" ); } + for file in arrow_files { // warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len(); diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a6ba5558e..5be60b67e 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -36,7 +36,8 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::{debug, error, warn}; +use tracing::info; +use tracing::{error, warn}; use ulid::Ulid; use crate::alerts::AlertConfig; @@ -718,7 +719,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // get all streams for stream_name in PARSEABLE.streams.list() { - debug!("Starting object_store_sync for stream- {stream_name}"); + info!("Starting object_store_sync for stream- {stream_name}"); let stream = PARSEABLE.get_or_create_stream(&stream_name); let custom_partition = stream.get_custom_partition();