diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 4376c8463..2666b831b 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -40,7 +40,7 @@ lazy_static! { .expect("metric can be created"); pub static ref STORAGE_SIZE: IntGaugeVec = IntGaugeVec::new( Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), - &["stream", "format"] + &["type", "stream", "format"] ) .expect("metric can be created"); pub static ref STAGING_FILES: IntGaugeVec = IntGaugeVec::new( @@ -117,7 +117,7 @@ pub fn load_from_global_stats() { .with_label_values(&[&stream_name, "json"]) .set(stats.ingestion as i64); STORAGE_SIZE - .with_label_values(&[&stream_name, "parquet"]) + .with_label_values(&["data", &stream_name, "parquet"]) .set(stats.storage as i64) } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index e42ac895a..8b210e3d9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -268,6 +268,15 @@ pub trait ObjectStorage: Sync + 'static { .with_label_values(&[stream]) .set(files.len() as i64); + for file in &files { + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + + STORAGE_SIZE + .with_label_values(&["staging", stream, file_type]) + .add(file_size as i64); + } + let record_reader = MergedRecordReader::try_new(&files).unwrap(); let mut parquet_table = CACHED_FILES.lock().unwrap(); @@ -355,7 +364,7 @@ pub trait ObjectStorage: Sync + 'static { let stats = STREAM_INFO.read().unwrap().get(stream).map(|metadata| { metadata.stats.add_storage_size(compressed_size); STORAGE_SIZE - .with_label_values(&[stream, "parquet"]) + .with_label_values(&["data", stream, "parquet"]) .add(compressed_size as i64); Stats::from(&metadata.stats) });