From b25a325486cdf63d96b1fb17fd364da72e5ccbd8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 Jan 2025 16:01:55 +0530 Subject: [PATCH 1/4] rename API `datebin` to `counts` --- src/handlers/http/modal/query_server.rs | 2 +- src/handlers/http/modal/server.rs | 7 ++-- src/handlers/http/query.rs | 24 ++++++------- src/query/mod.rs | 48 ++++++++++++------------- 4 files changed, 40 insertions(+), 41 deletions(-) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index cc94e0f59..1f375c04b 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -52,7 +52,6 @@ impl ParseableServer for QueryServer { config .service( web::scope(&base_path()) - .service(Server::get_date_bin()) .service(Server::get_correlation_webscope()) .service(Server::get_query_factory()) .service(Server::get_liveness_factory()) @@ -65,6 +64,7 @@ impl ParseableServer for QueryServer { .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) + .service(Server::get_counts_webscope()) .service(Server::get_metrics_webscope()) .service(Self::get_cluster_web_scope()), ) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index b82a9ddf1..f26723361 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -80,7 +80,7 @@ impl ParseableServer for Server { .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) - .service(Self::get_date_bin()) + .service(Self::get_counts_webscope()) .service(Self::get_metrics_webscope()), ) .service(Self::get_ingest_otel_factory()) @@ -267,9 +267,8 @@ impl Server { ), ) } - pub fn get_date_bin() -> Resource { - web::resource("/datebin") - .route(web::post().to(query::get_date_bin).authorize(Action::Query)) + pub fn get_counts_webscope() -> Resource { + web::resource("/counts").route(web::post().to(query::get_counts).authorize(Action::Query)) } // get the query factory diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index d9201a4d7..4f83e0c04 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -41,7 +41,7 @@ use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; -use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery}; +use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -105,20 +105,20 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, + counts_request: Json, ) -> Result { let creds = extract_session_key_from_req(&req)?; let permissions = Users.get_permissions(&creds); // does user have access to table? - user_auth_for_query(&permissions, &[date_bin.stream.clone()])?; + user_auth_for_query(&permissions, &[counts_request.stream.clone()])?; - let date_bin_records = date_bin.get_bin_density().await?; + let count_records = counts_request.get_bin_density().await?; - Ok(web::Json(DateBinResponse { - fields: vec!["date_bin_timestamp".into(), "log_count".into()], - records: date_bin_records, + Ok(web::Json(CountsResponse { + fields: vec!["counts_timestamp".into(), "log_count".into()], + records: count_records, })) } diff --git a/src/query/mod.rs b/src/query/mod.rs index ea9361357..cbbba4f92 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -244,33 +244,33 @@ impl Query { } } -/// DateBinRecord +/// Record of counts for a given time bin. #[derive(Debug, Serialize, Clone)] -pub struct DateBinRecord { - pub date_bin_timestamp: String, +pub struct CountsRecord { + pub counts_timestamp: String, pub log_count: u64, } -struct DateBinBounds { +struct TimeBounds { start: DateTime, end: DateTime, } -/// DateBin Request. +/// Request for counts, received from API/SQL query. #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct DateBinRequest { +pub struct CountsRequest { pub stream: String, pub start_time: String, pub end_time: String, pub num_bins: u64, } -impl DateBinRequest { +impl CountsRequest { /// This function is supposed to read maninfest files for the given stream, /// get the sum of `num_rows` between the `startTime` and `endTime`, /// divide that by number of bins and return in a manner acceptable for the console - pub async fn get_bin_density(&self) -> Result, QueryError> { + pub async fn get_bin_density(&self) -> Result, QueryError> { let time_partition = STREAM_INFO .get_time_partition(&self.stream.clone()) .map_err(|err| anyhow::Error::msg(err.to_string()))? @@ -280,18 +280,18 @@ impl DateBinRequest { let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; let all_manifest_files = get_manifest_list(&self.stream, &time_range).await?; // get final date bins - let final_date_bins = self.get_bins(&time_range); + let counts = self.get_bins(&time_range); // we have start and end times for each bin // we also have all the manifest files for the given time range // now we iterate over start and end times for each bin // then we iterate over the manifest files which are within that time range // we sum up the num_rows - let mut date_bin_records = Vec::new(); + let mut counts_records = Vec::new(); - for bin in final_date_bins { + for bin in counts { // extract start and end time to compare - let date_bin_timestamp = bin.start.timestamp_millis(); + let counts_timestamp = bin.start.timestamp_millis(); // Sum up the number of rows that fall within the bin let total_num_rows: u64 = all_manifest_files @@ -315,18 +315,18 @@ impl DateBinRequest { }) .sum(); - date_bin_records.push(DateBinRecord { - date_bin_timestamp: DateTime::from_timestamp_millis(date_bin_timestamp) + counts_records.push(CountsRecord { + counts_timestamp: DateTime::from_timestamp_millis(counts_timestamp) .unwrap() .to_rfc3339(), log_count: total_num_rows, }); } - Ok(date_bin_records) + Ok(counts_records) } /// Calculate the end time for each bin based on the number of bins - fn get_bins(&self, time_range: &TimeRange) -> Vec { + fn get_bins(&self, time_range: &TimeRange) -> Vec { let total_minutes = time_range .end .signed_duration_since(time_range.start) @@ -339,7 +339,7 @@ impl DateBinRequest { // now create multiple bins [startTime, endTime) // Should we exclude the last one??? - let mut final_date_bins = vec![]; + let mut counts = vec![]; let mut start = time_range.start; @@ -352,32 +352,32 @@ impl DateBinRequest { // Create bins for all but the last date for _ in 0..loop_end { let end = start + Duration::minutes(quotient as i64); - final_date_bins.push(DateBinBounds { start, end }); + counts.push(TimeBounds { start, end }); start = end; } // Add the last bin, accounting for any remainder, should we include it? if have_remainder { - final_date_bins.push(DateBinBounds { + counts.push(TimeBounds { start, end: start + Duration::minutes(remainder as i64), }); } else { - final_date_bins.push(DateBinBounds { + counts.push(TimeBounds { start, end: start + Duration::minutes(quotient as i64), }); } - final_date_bins + counts } } -/// DateBin Response. +/// Response for the counts API #[derive(Debug, Serialize, Clone)] -pub struct DateBinResponse { +pub struct CountsResponse { pub fields: Vec, - pub records: Vec, + pub records: Vec, } #[derive(Debug, Default)] From d9c9d393dfa4438c0d619565d90b3ef9b12ff688 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 Jan 2025 16:07:09 +0530 Subject: [PATCH 2/4] doc: point out what is happening --- src/handlers/http/query.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 4f83e0c04..a177c966d 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -104,6 +104,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Date: Thu, 16 Jan 2025 16:14:18 +0530 Subject: [PATCH 3/4] doc: explainer --- src/handlers/http/query.rs | 4 ++-- src/query/mod.rs | 30 +++++++++++++++++++----------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index a177c966d..fc2868dfa 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -159,11 +159,11 @@ pub async fn get_counts( // does user have access to table? user_auth_for_query(&permissions, &[counts_request.stream.clone()])?; - let count_records = counts_request.get_bin_density().await?; + let records = counts_request.get_bin_density().await?; Ok(web::Json(CountsResponse { fields: vec!["counts_timestamp".into(), "log_count".into()], - records: count_records, + records, })) } diff --git a/src/query/mod.rs b/src/query/mod.rs index cbbba4f92..5285a45ae 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -247,7 +247,9 @@ impl Query { /// Record of counts for a given time bin. #[derive(Debug, Serialize, Clone)] pub struct CountsRecord { + /// Start time of the bin pub counts_timestamp: String, + /// Number of logs in the bin pub log_count: u64, } @@ -260,9 +262,13 @@ struct TimeBounds { #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct CountsRequest { + /// Name of the stream to get counts for pub stream: String, + /// Included start time for counts query pub start_time: String, + /// Excluded end time for counts query pub end_time: String, + /// Number of bins to divide the time range into pub num_bins: u64, } @@ -279,8 +285,8 @@ impl CountsRequest { // get time range let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; let all_manifest_files = get_manifest_list(&self.stream, &time_range).await?; - // get final date bins - let counts = self.get_bins(&time_range); + // get bounds + let counts = self.get_bounds(&time_range); // we have start and end times for each bin // we also have all the manifest files for the given time range @@ -294,7 +300,7 @@ impl CountsRequest { let counts_timestamp = bin.start.timestamp_millis(); // Sum up the number of rows that fall within the bin - let total_num_rows: u64 = all_manifest_files + let log_count: u64 = all_manifest_files .iter() .flat_map(|m| &m.files) .filter_map(|f| { @@ -319,14 +325,14 @@ impl CountsRequest { counts_timestamp: DateTime::from_timestamp_millis(counts_timestamp) .unwrap() .to_rfc3339(), - log_count: total_num_rows, + log_count, }); } Ok(counts_records) } /// Calculate the end time for each bin based on the number of bins - fn get_bins(&self, time_range: &TimeRange) -> Vec { + fn get_bounds(&self, time_range: &TimeRange) -> Vec { let total_minutes = time_range .end .signed_duration_since(time_range.start) @@ -337,9 +343,9 @@ impl CountsRequest { let remainder = total_minutes % self.num_bins; let have_remainder = remainder > 0; - // now create multiple bins [startTime, endTime) + // now create multiple bounds [startTime, endTime) // Should we exclude the last one??? - let mut counts = vec![]; + let mut bounds = vec![]; let mut start = time_range.start; @@ -352,31 +358,33 @@ impl CountsRequest { // Create bins for all but the last date for _ in 0..loop_end { let end = start + Duration::minutes(quotient as i64); - counts.push(TimeBounds { start, end }); + bounds.push(TimeBounds { start, end }); start = end; } // Add the last bin, accounting for any remainder, should we include it? if have_remainder { - counts.push(TimeBounds { + bounds.push(TimeBounds { start, end: start + Duration::minutes(remainder as i64), }); } else { - counts.push(TimeBounds { + bounds.push(TimeBounds { start, end: start + Duration::minutes(quotient as i64), }); } - counts + bounds } } /// Response for the counts API #[derive(Debug, Serialize, Clone)] pub struct CountsResponse { + /// Fields in the log stream pub fields: Vec, + /// Records in the response pub records: Vec, } From 9e4c7d2aa9e37c35f45e5faf0f95c0c3df8eebfb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 Jan 2025 16:24:09 +0530 Subject: [PATCH 4/4] doc: note why won't panic --- src/handlers/http/query.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index fc2868dfa..e0ede6917 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -113,13 +113,15 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result