Skip to content

Commit 3f057b8

Browse files
committed
feat: add endpoint on Query Server to fetch the cluster info
on hitting {url}/api/v1/cluster/info it would return a json response with the information on the cluster Return JSON looks like ``` [ { domain_name: "<domain_name>:<port>", reachable: bool, error: string error message, status: string }, . . . }
1 parent b663831 commit 3f057b8

File tree

2 files changed

+78
-10
lines changed

2 files changed

+78
-10
lines changed

server/src/handlers/http.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,7 @@ pub(crate) fn cross_origin_config() -> Cors {
5151
Cors::default().block_on_origin_mismatch(false)
5252
}
5353
}
54+
55+
pub fn base_path_without_preceding_slash() -> String {
56+
base_path().trim_start_matches('/').to_string()
57+
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
*/
1818

1919
use crate::handlers::http::logstream::error::StreamError;
20-
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
20+
use crate::handlers::http::{
21+
base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION,
22+
};
2123
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
2224
use actix_web::http::header;
23-
use actix_web::web;
2425
use actix_web::web::ServiceConfig;
26+
use actix_web::{web, Responder};
2527
use actix_web::{App, HttpServer};
2628
use async_trait::async_trait;
2729
use chrono::{DateTime, Utc};
@@ -135,11 +137,17 @@ impl QueryServer {
135137
.service(Server::get_user_webscope())
136138
.service(Server::get_llm_webscope())
137139
.service(Server::get_oauth_webscope(oidc_client))
138-
.service(Server::get_user_role_webscope()),
140+
.service(Server::get_user_role_webscope())
141+
.service(Self::get_cluster_info_web_scope()),
139142
)
140143
.service(Server::get_generated());
141144
}
142145

146+
fn get_cluster_info_web_scope() -> actix_web::Scope {
147+
web::scope("/cluster")
148+
.service(web::resource("/info").route(web::get().to(Self::get_cluster_info)))
149+
}
150+
143151
// update the .query.json file and return the new IngesterMetadataArr
144152
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
145153
let store = CONFIG.storage().get_object_store();
@@ -163,7 +171,7 @@ impl QueryServer {
163171
}
164172

165173
pub async fn check_liveness(domain_name: &str) -> bool {
166-
let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap();
174+
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();
167175

168176
let reqw = reqwest::Client::new()
169177
.get(uri)
@@ -174,6 +182,38 @@ impl QueryServer {
174182
reqw.is_ok()
175183
}
176184

185+
async fn get_cluster_info() -> Result<impl Responder, StreamError> {
186+
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
187+
log::error!("Fatal: failed to get ingester info: {:?}", err);
188+
StreamError::Custom {
189+
msg: format!("failed to get ingester info\n{:?}", err),
190+
status: StatusCode::INTERNAL_SERVER_ERROR,
191+
}
192+
})?;
193+
194+
let mut infos = vec![];
195+
196+
for ingester in ingester_infos {
197+
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
198+
.expect("should always be a valid url");
199+
200+
let reqw = reqwest::Client::new()
201+
.get(uri)
202+
.header(header::CONTENT_TYPE, "application/json")
203+
.send()
204+
.await;
205+
206+
infos.push(ClusterInfo::new(
207+
&ingester.domain_name,
208+
reqw.is_ok(),
209+
reqw.as_ref().err().map(|e| e.to_string()),
210+
reqw.ok().map(|r| r.status().to_string()),
211+
));
212+
}
213+
214+
Ok(actix_web::HttpResponse::Ok().json(infos))
215+
}
216+
177217
/// initialize the server, run migrations as needed and start the server
178218
async fn initialize(&self) -> anyhow::Result<()> {
179219
migration::run_metadata_migration(&CONFIG).await?;
@@ -254,8 +294,8 @@ impl QueryServer {
254294
for ingester in ingester_infos.iter() {
255295
let url = format!(
256296
"{}{}/logstream/{}",
257-
ingester.domain_name.to_string().trim_end_matches('/'),
258-
base_path(),
297+
ingester.domain_name,
298+
base_path_without_preceding_slash(),
259299
stream_name
260300
);
261301

@@ -272,8 +312,8 @@ impl QueryServer {
272312
for ingester in ingester_infos {
273313
let url = format!(
274314
"{}{}/logstream/{}",
275-
ingester.domain_name.to_string().trim_end_matches('/'),
276-
base_path(),
315+
ingester.domain_name,
316+
base_path_without_preceding_slash(),
277317
stream_name
278318
);
279319

@@ -306,8 +346,8 @@ impl QueryServer {
306346
for ingester in ingester_infos {
307347
let url = format!(
308348
"{}{}/logstream/{}/stats",
309-
ingester.domain_name.to_string().trim_end_matches('/'),
310-
base_path(),
349+
ingester.domain_name,
350+
base_path_without_preceding_slash(),
311351
stream_name
312352
);
313353

@@ -615,3 +655,27 @@ impl StorageStats {
615655
}
616656
}
617657
}
658+
659+
#[derive(Debug, Default, Serialize, Deserialize)]
660+
struct ClusterInfo {
661+
domain_name: String,
662+
reachable: bool,
663+
error: Option<String>, // error message if the ingester is not reachable
664+
status: Option<String>, // status message if the ingester is reachable
665+
}
666+
667+
impl ClusterInfo {
668+
fn new(
669+
domain_name: &str,
670+
reachable: bool,
671+
error: Option<String>,
672+
status: Option<String>,
673+
) -> Self {
674+
Self {
675+
domain_name: domain_name.to_string(),
676+
reachable,
677+
error,
678+
status,
679+
}
680+
}
681+
}

0 commit comments

Comments
 (0)