Skip to content

Commit d06370c

Browse files
committed
feat: add endpoint on Query Server to fetch the cluster info
on hitting {url}/api/v1/cluster/info it would return a json response with the information on the cluster Return JSON looks like ``` [ { domain_name: "<domain_name>:<port>", reachable: bool, error: string error message, status: string }, . . . }
1 parent b663831 commit d06370c

File tree

2 files changed

+77
-11
lines changed

2 files changed

+77
-11
lines changed

server/src/handlers/http.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,7 @@ pub(crate) fn cross_origin_config() -> Cors {
5151
Cors::default().block_on_origin_mismatch(false)
5252
}
5353
}
54+
55+
pub fn base_path_without_preceding_slash() -> String {
56+
base_path().trim_start_matches('/').to_string()
57+
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
*/
1818

1919
use crate::handlers::http::logstream::error::StreamError;
20-
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
20+
use crate::handlers::http::{base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION};
2121
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
2222
use actix_web::http::header;
23-
use actix_web::web;
2423
use actix_web::web::ServiceConfig;
24+
use actix_web::{web, Responder};
2525
use actix_web::{App, HttpServer};
2626
use async_trait::async_trait;
2727
use chrono::{DateTime, Utc};
@@ -135,11 +135,17 @@ impl QueryServer {
135135
.service(Server::get_user_webscope())
136136
.service(Server::get_llm_webscope())
137137
.service(Server::get_oauth_webscope(oidc_client))
138-
.service(Server::get_user_role_webscope()),
139-
)
138+
.service(Server::get_user_role_webscope())
139+
.service(Self::get_cluster_info_web_scope()),
140+
)
140141
.service(Server::get_generated());
141142
}
142143

144+
fn get_cluster_info_web_scope() -> actix_web::Scope {
145+
web::scope("/cluster")
146+
.service(web::resource("/info").route(web::get().to(Self::get_cluster_info)))
147+
}
148+
143149
// update the .query.json file and return the new IngesterMetadataArr
144150
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
145151
let store = CONFIG.storage().get_object_store();
@@ -163,7 +169,7 @@ impl QueryServer {
163169
}
164170

165171
pub async fn check_liveness(domain_name: &str) -> bool {
166-
let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap();
172+
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();
167173

168174
let reqw = reqwest::Client::new()
169175
.get(uri)
@@ -174,6 +180,38 @@ impl QueryServer {
174180
reqw.is_ok()
175181
}
176182

183+
async fn get_cluster_info() -> Result<impl Responder, StreamError> {
184+
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
185+
log::error!("Fatal: failed to get ingester info: {:?}", err);
186+
StreamError::Custom {
187+
msg: format!("failed to get ingester info\n{:?}", err),
188+
status: StatusCode::INTERNAL_SERVER_ERROR,
189+
}
190+
})?;
191+
192+
let mut infos = vec![];
193+
194+
for ingester in ingester_infos {
195+
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
196+
.expect("should always be a valid url");
197+
198+
let reqw = reqwest::Client::new()
199+
.get(uri)
200+
.header(header::CONTENT_TYPE, "application/json")
201+
.send()
202+
.await;
203+
204+
infos.push(ClusterInfo::new(
205+
&ingester.domain_name,
206+
reqw.is_ok(),
207+
reqw.as_ref().err().map(|e| e.to_string()),
208+
reqw.ok().map(|r| r.status().to_string()),
209+
));
210+
}
211+
212+
Ok(actix_web::HttpResponse::Ok().json(infos))
213+
}
214+
177215
/// initialize the server, run migrations as needed and start the server
178216
async fn initialize(&self) -> anyhow::Result<()> {
179217
migration::run_metadata_migration(&CONFIG).await?;
@@ -254,8 +292,8 @@ impl QueryServer {
254292
for ingester in ingester_infos.iter() {
255293
let url = format!(
256294
"{}{}/logstream/{}",
257-
ingester.domain_name.to_string().trim_end_matches('/'),
258-
base_path(),
295+
ingester.domain_name,
296+
base_path_without_preceding_slash(),
259297
stream_name
260298
);
261299

@@ -272,8 +310,8 @@ impl QueryServer {
272310
for ingester in ingester_infos {
273311
let url = format!(
274312
"{}{}/logstream/{}",
275-
ingester.domain_name.to_string().trim_end_matches('/'),
276-
base_path(),
313+
ingester.domain_name,
314+
base_path_without_preceding_slash(),
277315
stream_name
278316
);
279317

@@ -306,8 +344,8 @@ impl QueryServer {
306344
for ingester in ingester_infos {
307345
let url = format!(
308346
"{}{}/logstream/{}/stats",
309-
ingester.domain_name.to_string().trim_end_matches('/'),
310-
base_path(),
347+
ingester.domain_name,
348+
base_path_without_preceding_slash(),
311349
stream_name
312350
);
313351

@@ -615,3 +653,27 @@ impl StorageStats {
615653
}
616654
}
617655
}
656+
657+
#[derive(Debug, Default, Serialize, Deserialize)]
658+
struct ClusterInfo {
659+
domain_name: String,
660+
reachable: bool,
661+
error: Option<String>, // error message if the ingester is not reachable
662+
status: Option<String>, // status message if the ingester is reachable
663+
}
664+
665+
impl ClusterInfo {
666+
fn new(
667+
domain_name: &str,
668+
reachable: bool,
669+
error: Option<String>,
670+
status: Option<String>,
671+
) -> Self {
672+
Self {
673+
domain_name: domain_name.to_string(),
674+
reachable,
675+
error,
676+
status,
677+
}
678+
}
679+
}

0 commit comments

Comments
 (0)