Skip to content

Commit ec30a5c

Browse files
authored
fix: stats response (#733)
* fix: stats response * fix: s3 get objects Refactor object storage to filter objects by starts_with_pattern
1 parent 01eb76c commit ec30a5c

File tree

6 files changed

+46
-51
lines changed

6 files changed

+46
-51
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,22 @@
1818

1919
pub mod utils;
2020

21-
use crate::handlers::http::cluster::utils::{check_liveness, to_url_string};
21+
use crate::handlers::http::cluster::utils::{
22+
check_liveness, to_url_string, IngestionStats, QueriedStats,
23+
};
2224
use crate::handlers::http::ingest::PostError;
2325
use crate::handlers::http::logstream::error::StreamError;
2426
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
2527
use crate::option::CONFIG;
2628

2729
use crate::metrics::prom_utils::Metrics;
2830
use crate::storage::object_storage::ingester_metadata_path;
29-
use crate::storage::ObjectStorageError;
30-
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
31+
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
32+
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
3133
use actix_web::http::header;
3234
use actix_web::{HttpRequest, Responder};
3335
use bytes::Bytes;
36+
use chrono::Utc;
3437
use http::StatusCode;
3538
use itertools::Itertools;
3639
use relative_path::RelativePathBuf;
@@ -39,6 +42,8 @@ use url::Url;
3942

4043
type IngesterMetadataArr = Vec<IngesterMetadata>;
4144

45+
use self::utils::StorageStats;
46+
4247
use super::base_path_without_preceding_slash;
4348

4449
use super::modal::IngesterMetadata;
@@ -108,51 +113,31 @@ pub async fn sync_streams_with_ingesters(
108113
pub async fn fetch_stats_from_ingesters(
109114
stream_name: &str,
110115
) -> Result<Vec<utils::QueriedStats>, StreamError> {
111-
let mut stats = Vec::new();
112-
113-
let ingester_infos = get_ingester_info().await.map_err(|err| {
114-
log::error!("Fatal: failed to get ingester info: {:?}", err);
115-
StreamError::Anyhow(err)
116-
})?;
117-
118-
for ingester in ingester_infos {
119-
let url = format!(
120-
"{}{}/logstream/{}/stats",
121-
ingester.domain_name,
122-
base_path_without_preceding_slash(),
123-
stream_name
124-
);
125-
126-
match utils::send_stats_request(&url, ingester.clone()).await {
127-
Ok(Some(res)) => {
128-
match serde_json::from_str::<utils::QueriedStats>(&res.text().await.unwrap()) {
129-
Ok(stat) => stats.push(stat),
130-
Err(err) => {
131-
log::error!(
132-
"Could not parse stats from ingester: {}\n Error: {:?}",
133-
ingester.domain_name,
134-
err
135-
);
136-
continue;
137-
}
138-
}
139-
}
140-
Ok(None) => {
141-
log::error!("Ingester at {} is not reachable", &ingester.domain_name);
142-
continue;
143-
}
144-
Err(err) => {
145-
log::error!(
146-
"Fatal: failed to fetch stats from ingester: {}\n Error: {:?}",
147-
ingester.domain_name,
148-
err
149-
);
150-
return Err(err);
151-
}
116+
let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
117+
let obs = CONFIG
118+
.storage()
119+
.get_object_store()
120+
.get_objects(Some(&path), ".ingester")
121+
.await?;
122+
let mut ingestion_size = 0u64;
123+
let mut storage_size = 0u64;
124+
let mut count = 0u64;
125+
for ob in obs {
126+
if let Ok(stat) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
127+
count += stat.stats.events;
128+
ingestion_size += stat.stats.ingestion;
129+
storage_size += stat.stats.storage;
152130
}
153131
}
154132

155-
Ok(stats)
133+
let qs = QueriedStats::new(
134+
"",
135+
Utc::now(),
136+
IngestionStats::new(count, format!("{} Bytes", ingestion_size), "json"),
137+
StorageStats::new(format!("{} Bytes", storage_size), "parquet"),
138+
);
139+
140+
Ok(vec![qs])
156141
}
157142

158143
async fn send_stream_sync_request(
@@ -361,7 +346,7 @@ pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
361346

362347
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
363348
let arr = store
364-
.get_objects(Some(&root_path))
349+
.get_objects(Some(&root_path), "ingester")
365350
.await?
366351
.iter()
367352
// this unwrap will most definateley shoot me in the foot later

server/src/handlers/http/cluster/utils.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ impl IngestionStats {
9898

9999
#[derive(Debug, Default, Serialize, Deserialize)]
100100
pub struct StorageStats {
101-
size: String,
102-
format: String,
101+
pub size: String,
102+
pub format: String,
103103
}
104104

105105
impl StorageStats {
@@ -120,7 +120,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
120120
// .unwrap(); // should never be None
121121

122122
// get the stream name
123-
let stream_name = stats[0].stream.clone();
123+
let stream_name = stats[1].stream.clone();
124124

125125
// get the first event at
126126
// let min_first_event_at = stats
@@ -198,6 +198,8 @@ pub async fn check_liveness(domain_name: &str) -> bool {
198198
}
199199

200200
/// send a request to the ingester to fetch its stats
201+
/// dead for now
202+
#[allow(dead_code)]
201203
pub async fn send_stats_request(
202204
url: &str,
203205
ingester: IngesterMetadata,

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl IngestServer {
251251
let store = CONFIG.storage().get_object_store();
252252
let base_path = RelativePathBuf::from("");
253253
let ingester_metadata = store
254-
.get_objects(Some(&base_path))
254+
.get_objects(Some(&base_path), "ingester")
255255
.await?
256256
.iter()
257257
// this unwrap will most definateley shoot me in the foot later

server/src/storage/localfs.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,11 @@ impl ObjectStorage for LocalFS {
189189
Ok(path_arr)
190190
}
191191

192+
/// currently it is not using the starts_with_pattern
192193
async fn get_objects(
193194
&self,
194195
base_path: Option<&RelativePath>,
196+
_starts_with_pattern: &str,
195197
) -> Result<Vec<Bytes>, ObjectStorageError> {
196198
let time = Instant::now();
197199

server/src/storage/object_storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub trait ObjectStorage: Sync + 'static {
6969
async fn get_objects(
7070
&self,
7171
base_path: Option<&RelativePath>,
72+
starts_with_pattern: &str,
7273
) -> Result<Vec<Bytes>, ObjectStorageError>;
7374
async fn put_object(
7475
&self,

server/src/storage/s3.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ impl ObjectStorage for S3 {
416416
async fn get_objects(
417417
&self,
418418
base_path: Option<&RelativePath>,
419+
starts_with_pattern: &str,
419420
) -> Result<Vec<Bytes>, ObjectStorageError> {
420421
let instant = Instant::now();
421422

@@ -430,7 +431,11 @@ impl ObjectStorage for S3 {
430431
let mut res = vec![];
431432

432433
while let Some(meta) = list_stream.next().await.transpose()? {
433-
let ingester_file = meta.location.filename().unwrap().starts_with("ingester");
434+
let ingester_file = meta
435+
.location
436+
.filename()
437+
.unwrap()
438+
.starts_with(starts_with_pattern);
434439

435440
if !ingester_file {
436441
continue;

0 commit comments

Comments
 (0)