From d572881425d520c57285b273ef53f185f645e4b1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 May 2025 04:29:01 -0400 Subject: [PATCH 1/3] fix: date level stats read all stream.json files for a dataset get sum of stats for a given date from the manifest list in snapshot no need to calculate querier / ingestor stats separately --- src/handlers/http/cluster/mod.rs | 2 +- .../http/modal/query/querier_logstream.rs | 24 +++++++------------ src/prism/home/mod.rs | 23 ++++++------------ 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 4b5456806..bedacc25d 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -393,7 +393,7 @@ pub async fn sync_role_update_with_ingestors( .await } -pub fn fetch_daily_stats_from_ingestors( +pub fn fetch_daily_stats( date: &str, stream_meta_list: &[ObjectStoreFormat], ) -> Result { diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index fc2242cdc..73a618c49 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -36,11 +36,10 @@ use crate::{ handlers::http::{ base_path_without_preceding_slash, cluster::{ - self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, - sync_streams_with_ingestors, + self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors, utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, - logstream::{error::StreamError, get_stats_date}, + logstream::error::StreamError, modal::{NodeMetadata, NodeType}, }, hottier::HotTierManager, @@ -154,8 +153,6 @@ pub async fn get_stats( } if !date_value.is_empty() { - let querier_stats = get_stats_date(&stream_name, date_value).await?; - // this function requires all the ingestor stream jsons let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]); let obs = PARSEABLE @@ -163,13 +160,11 @@ pub async fn get_stats( .get_object_store() .get_objects( Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") - }), + Box::new(|file_name| file_name.ends_with("stream.json")), ) .await?; - let mut ingestor_stream_jsons = Vec::new(); + let mut stream_jsons = Vec::new(); for ob in obs { let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) { Ok(d) => d, @@ -178,16 +173,15 @@ pub async fn get_stats( continue; } }; - ingestor_stream_jsons.push(stream_metadata); + stream_jsons.push(stream_metadata); } - let ingestor_stats = - fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?; + let stats = fetch_daily_stats(date_value, &stream_jsons)?; let total_stats = Stats { - events: querier_stats.events + ingestor_stats.events, - ingestion: querier_stats.ingestion + ingestor_stats.ingestion, - storage: querier_stats.storage + ingestor_stats.storage, + events: stats.events, + ingestion: stats.ingestion, + storage: stats.storage, }; let stats = serde_json::to_value(total_stats)?; diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 82e24eb9d..b16f81f20 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -30,10 +30,7 @@ use crate::{ alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, event::format::LogSource, - handlers::http::{ - cluster::fetch_daily_stats_from_ingestors, - logstream::{error::StreamError, get_stats_date}, - }, + handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError}, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, @@ -221,9 +218,9 @@ async fn stats_for_date( }; // Process each stream concurrently - let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| { - get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone()) - }); + let stream_stats_futures = stream_wise_meta + .values() + .map(|meta| get_stream_stats_for_date(date.clone(), meta.clone())); let stream_stats_results = futures::future::join_all(stream_stats_futures).await; @@ -246,18 +243,12 @@ async fn stats_for_date( } async fn get_stream_stats_for_date( - stream: String, date: String, meta: Vec, ) -> Result<(u64, u64, u64), PrismHomeError> { - let querier_stats = get_stats_date(&stream, &date).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; - - Ok(( - querier_stats.events + ingestor_stats.events, - querier_stats.ingestion + ingestor_stats.ingestion, - querier_stats.storage + ingestor_stats.storage, - )) + let stats = fetch_daily_stats(&date, &meta)?; + + Ok((stats.events, stats.ingestion, stats.storage)) } pub async fn generate_home_search_response( From 7f3e7528850525d17a1d4b66ab154d03038e6fa9 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 May 2025 06:02:27 -0400 Subject: [PATCH 2/3] refactor --- src/handlers/http/modal/query/querier_logstream.rs | 9 ++------- src/prism/home/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 73a618c49..25ffe789d 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -44,7 +44,7 @@ use crate::{ }, hottier::HotTierManager, parseable::{StreamNotFound, PARSEABLE}, - stats::{self, Stats}, + stats, storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, }; @@ -178,12 +178,7 @@ pub async fn get_stats( let stats = fetch_daily_stats(date_value, &stream_jsons)?; - let total_stats = Stats { - events: stats.events, - ingestion: stats.ingestion, - storage: stats.storage, - }; - let stats = serde_json::to_value(total_stats)?; + let stats = serde_json::to_value(stats)?; return Ok((web::Json(stats), StatusCode::OK)); } diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index b16f81f20..c249b2d97 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -220,7 +220,7 @@ async fn stats_for_date( // Process each stream concurrently let stream_stats_futures = stream_wise_meta .values() - .map(|meta| get_stream_stats_for_date(date.clone(), meta.clone())); + .map(|meta| get_stream_stats_for_date(date.clone(), meta)); let stream_stats_results = futures::future::join_all(stream_stats_futures).await; @@ -244,9 +244,9 @@ async fn stats_for_date( async fn get_stream_stats_for_date( date: String, - meta: Vec, + meta: &[ObjectStoreFormat], ) -> Result<(u64, u64, u64), PrismHomeError> { - let stats = fetch_daily_stats(&date, &meta)?; + let stats = fetch_daily_stats(&date, meta)?; Ok((stats.events, stats.ingestion, stats.storage)) } From 533890a8a30ea931c9c7cb18045ac3acedd215f3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 May 2025 06:25:12 -0400 Subject: [PATCH 3/3] use web query for date stats --- src/handlers/http/logstream.rs | 3 +++ .../http/modal/query/querier_logstream.rs | 24 +++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index b5d65be5d..18cc0f074 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -524,6 +524,8 @@ pub mod error { HotTierValidation(#[from] HotTierValidationError), #[error("{0}")] HotTierError(#[from] HotTierError), + #[error("Invalid query parameter: {0}")] + InvalidQueryParameter(String), } impl actix_web::ResponseError for StreamError { @@ -559,6 +561,7 @@ pub mod error { StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, + StreamError::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 25ffe789d..2be17af23 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -17,7 +17,7 @@ */ use core::str; -use std::fs; +use std::{collections::HashMap, fs}; use actix_web::{ web::{self, Path}, @@ -47,6 +47,7 @@ use crate::{ stats, storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, }; +const STATS_DATE_QUERY_PARAM: &str = "date"; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -141,16 +142,13 @@ pub async fn get_stats( return Err(StreamNotFound(stream_name.clone()).into()); } - let query_string = req.query_string(); - if !query_string.is_empty() { - let date_key = query_string.split('=').collect::>()[0]; - let date_value = query_string.split('=').collect::>()[1]; - if date_key != "date" { - return Err(StreamError::Custom { - msg: "Invalid query parameter".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| StreamError::InvalidQueryParameter(STATS_DATE_QUERY_PARAM.to_string()))?; + + if !query_map.is_empty() { + let date_value = query_map.get(STATS_DATE_QUERY_PARAM).ok_or_else(|| { + StreamError::InvalidQueryParameter(STATS_DATE_QUERY_PARAM.to_string()) + })?; if !date_value.is_empty() { // this function requires all the ingestor stream jsons @@ -180,7 +178,7 @@ pub async fn get_stats( let stats = serde_json::to_value(stats)?; - return Ok((web::Json(stats), StatusCode::OK)); + return Ok(web::Json(stats)); } } @@ -227,5 +225,5 @@ pub async fn get_stats( let stats = serde_json::to_value(stats)?; - Ok((web::Json(stats), StatusCode::OK)) + Ok(web::Json(stats)) }