Skip to content

Commit a39b9f7

Browse files
authored
feat: Add cluster info api end point (#699)
* feat: add endpoint on Query Server to fetch the cluster info on hitting {url}/api/v1/cluster/info it would return a json response with the information on the cluster Return JSON looks like ``` [ { domain_name: "<domain_name>:<port>", reachable: bool, error: string error message, status: string }, . . . ] * feat: add auth for /cluster/info endpoint 1. Add Action ListCluster 2. Authorize /cluster/info for ListCluster * fix: get_objects fetch the proper files When in S3 Mode, get_objects was fetching all the files that contained the pattern "ingester".
1 parent b663831 commit a39b9f7

File tree

5 files changed

+99
-17
lines changed

5 files changed

+99
-17
lines changed

server/src/handlers/http.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,7 @@ pub(crate) fn cross_origin_config() -> Cors {
5151
Cors::default().block_on_origin_mismatch(false)
5252
}
5353
}
54+
55+
pub fn base_path_without_preceding_slash() -> String {
56+
base_path().trim_start_matches('/').to_string()
57+
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
*/
1818

1919
use crate::handlers::http::logstream::error::StreamError;
20-
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
20+
use crate::handlers::http::middleware::RouteExt;
21+
use crate::handlers::http::{
22+
base_path, base_path_without_preceding_slash, cross_origin_config, API_BASE_PATH, API_VERSION,
23+
};
24+
use crate::rbac::role::Action;
2125
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
2226
use actix_web::http::header;
23-
use actix_web::web;
2427
use actix_web::web::ServiceConfig;
28+
use actix_web::{web, Responder};
2529
use actix_web::{App, HttpServer};
2630
use async_trait::async_trait;
2731
use chrono::{DateTime, Utc};
@@ -135,11 +139,22 @@ impl QueryServer {
135139
.service(Server::get_user_webscope())
136140
.service(Server::get_llm_webscope())
137141
.service(Server::get_oauth_webscope(oidc_client))
138-
.service(Server::get_user_role_webscope()),
142+
.service(Server::get_user_role_webscope())
143+
.service(Self::get_cluster_info_web_scope()),
139144
)
140145
.service(Server::get_generated());
141146
}
142147

148+
fn get_cluster_info_web_scope() -> actix_web::Scope {
149+
web::scope("/cluster").service(
150+
web::resource("/info").route(
151+
web::get()
152+
.to(Self::get_cluster_info)
153+
.authorize(Action::ListCluster),
154+
),
155+
)
156+
}
157+
143158
// update the .query.json file and return the new IngesterMetadataArr
144159
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
145160
let store = CONFIG.storage().get_object_store();
@@ -163,7 +178,7 @@ impl QueryServer {
163178
}
164179

165180
pub async fn check_liveness(domain_name: &str) -> bool {
166-
let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap();
181+
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();
167182

168183
let reqw = reqwest::Client::new()
169184
.get(uri)
@@ -174,6 +189,38 @@ impl QueryServer {
174189
reqw.is_ok()
175190
}
176191

192+
async fn get_cluster_info() -> Result<impl Responder, StreamError> {
193+
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
194+
log::error!("Fatal: failed to get ingester info: {:?}", err);
195+
StreamError::Custom {
196+
msg: format!("failed to get ingester info\n{:?}", err),
197+
status: StatusCode::INTERNAL_SERVER_ERROR,
198+
}
199+
})?;
200+
201+
let mut infos = vec![];
202+
203+
for ingester in ingester_infos {
204+
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
205+
.expect("should always be a valid url");
206+
207+
let reqw = reqwest::Client::new()
208+
.get(uri)
209+
.header(header::CONTENT_TYPE, "application/json")
210+
.send()
211+
.await;
212+
213+
infos.push(ClusterInfo::new(
214+
&ingester.domain_name,
215+
reqw.is_ok(),
216+
reqw.as_ref().err().map(|e| e.to_string()),
217+
reqw.ok().map(|r| r.status().to_string()),
218+
));
219+
}
220+
221+
Ok(actix_web::HttpResponse::Ok().json(infos))
222+
}
223+
177224
/// initialize the server, run migrations as needed and start the server
178225
async fn initialize(&self) -> anyhow::Result<()> {
179226
migration::run_metadata_migration(&CONFIG).await?;
@@ -254,8 +301,8 @@ impl QueryServer {
254301
for ingester in ingester_infos.iter() {
255302
let url = format!(
256303
"{}{}/logstream/{}",
257-
ingester.domain_name.to_string().trim_end_matches('/'),
258-
base_path(),
304+
ingester.domain_name,
305+
base_path_without_preceding_slash(),
259306
stream_name
260307
);
261308

@@ -272,8 +319,8 @@ impl QueryServer {
272319
for ingester in ingester_infos {
273320
let url = format!(
274321
"{}{}/logstream/{}",
275-
ingester.domain_name.to_string().trim_end_matches('/'),
276-
base_path(),
322+
ingester.domain_name,
323+
base_path_without_preceding_slash(),
277324
stream_name
278325
);
279326

@@ -306,8 +353,8 @@ impl QueryServer {
306353
for ingester in ingester_infos {
307354
let url = format!(
308355
"{}{}/logstream/{}/stats",
309-
ingester.domain_name.to_string().trim_end_matches('/'),
310-
base_path(),
356+
ingester.domain_name,
357+
base_path_without_preceding_slash(),
311358
stream_name
312359
);
313360

@@ -615,3 +662,27 @@ impl StorageStats {
615662
}
616663
}
617664
}
665+
666+
#[derive(Debug, Default, Serialize, Deserialize)]
667+
struct ClusterInfo {
668+
domain_name: String,
669+
reachable: bool,
670+
error: Option<String>, // error message if the ingester is not reachable
671+
status: Option<String>, // status message if the ingester is reachable
672+
}
673+
674+
impl ClusterInfo {
675+
fn new(
676+
domain_name: &str,
677+
reachable: bool,
678+
error: Option<String>,
679+
status: Option<String>,
680+
) -> Self {
681+
Self {
682+
domain_name: domain_name.to_string(),
683+
reachable,
684+
error,
685+
status,
686+
}
687+
}
688+
}

server/src/rbac/role.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub enum Action {
4444
ListRole,
4545
GetAbout,
4646
QueryLLM,
47+
ListCluster,
4748
All,
4849
}
4950

@@ -108,6 +109,7 @@ impl RoleBuilder {
108109
| Action::PutAlert
109110
| Action::GetAlert
110111
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
112+
Action::ListCluster => Permission::Unit(action),
111113
};
112114
perms.push(perm);
113115
}
@@ -215,6 +217,7 @@ pub mod model {
215217
Action::GetAlert,
216218
Action::GetAbout,
217219
Action::QueryLLM,
220+
Action::ListCluster,
218221
],
219222
stream: None,
220223
tag: None,

server/src/storage/object_storage.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,15 +451,23 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
451451

452452
#[inline(always)]
453453
fn schema_path(stream_name: &str) -> RelativePathBuf {
454-
RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])
454+
match CONFIG.parseable.mode {
455+
Mode::Ingest => {
456+
let (ip, port) = get_address();
457+
let file_name = format!(".ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME);
458+
459+
RelativePathBuf::from_iter([stream_name, &file_name])
460+
}
461+
Mode::All | Mode::Query => RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]),
462+
}
455463
}
456464

457465
#[inline(always)]
458466
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
459467
match &CONFIG.parseable.mode {
460468
Mode::Ingest => {
461469
let (ip, port) = get_address();
462-
let file_name = format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
470+
let file_name = format!(".ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
463471
RelativePathBuf::from_iter([stream_name, &file_name])
464472
}
465473
Mode::Query | Mode::All => {

server/src/storage/s3.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,11 +426,7 @@ impl ObjectStorage for S3 {
426426
let mut res = vec![];
427427

428428
while let Some(meta) = list_stream.next().await.transpose()? {
429-
let ingester_file = meta
430-
.location
431-
.filename()
432-
.unwrap_or_default()
433-
.contains("ingester");
429+
let ingester_file = meta.location.filename().unwrap().starts_with("ingester");
434430

435431
if !ingester_file {
436432
continue;

0 commit comments

Comments
 (0)