1818
1919pub mod utils;
2020
21- use crate :: handlers:: http:: cluster:: utils:: { check_liveness, to_url_string} ;
21+ use crate :: handlers:: http:: cluster:: utils:: {
22+ check_liveness, to_url_string, IngestionStats , QueriedStats ,
23+ } ;
2224use crate :: handlers:: http:: ingest:: PostError ;
2325use crate :: handlers:: http:: logstream:: error:: StreamError ;
2426use crate :: handlers:: { STATIC_SCHEMA_FLAG , TIME_PARTITION_KEY } ;
2527use crate :: option:: CONFIG ;
2628
2729use crate :: metrics:: prom_utils:: Metrics ;
2830use crate :: storage:: object_storage:: ingester_metadata_path;
29- use crate :: storage:: ObjectStorageError ;
30- use crate :: storage:: PARSEABLE_ROOT_DIRECTORY ;
31+ use crate :: storage:: { ObjectStorageError , STREAM_ROOT_DIRECTORY } ;
32+ use crate :: storage:: { ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY } ;
3133use actix_web:: http:: header;
3234use actix_web:: { HttpRequest , Responder } ;
3335use bytes:: Bytes ;
36+ use chrono:: Utc ;
3437use http:: StatusCode ;
3538use itertools:: Itertools ;
3639use relative_path:: RelativePathBuf ;
@@ -39,6 +42,8 @@ use url::Url;
3942
4043type IngesterMetadataArr = Vec < IngesterMetadata > ;
4144
45+ use self :: utils:: StorageStats ;
46+
4247use super :: base_path_without_preceding_slash;
4348
4449use super :: modal:: IngesterMetadata ;
@@ -108,51 +113,31 @@ pub async fn sync_streams_with_ingesters(
108113pub async fn fetch_stats_from_ingesters (
109114 stream_name : & str ,
110115) -> Result < Vec < utils:: QueriedStats > , StreamError > {
111- let mut stats = Vec :: new ( ) ;
112-
113- let ingester_infos = get_ingester_info ( ) . await . map_err ( |err| {
114- log:: error!( "Fatal: failed to get ingester info: {:?}" , err) ;
115- StreamError :: Anyhow ( err)
116- } ) ?;
117-
118- for ingester in ingester_infos {
119- let url = format ! (
120- "{}{}/logstream/{}/stats" ,
121- ingester. domain_name,
122- base_path_without_preceding_slash( ) ,
123- stream_name
124- ) ;
125-
126- match utils:: send_stats_request ( & url, ingester. clone ( ) ) . await {
127- Ok ( Some ( res) ) => {
128- match serde_json:: from_str :: < utils:: QueriedStats > ( & res. text ( ) . await . unwrap ( ) ) {
129- Ok ( stat) => stats. push ( stat) ,
130- Err ( err) => {
131- log:: error!(
132- "Could not parse stats from ingester: {}\n Error: {:?}" ,
133- ingester. domain_name,
134- err
135- ) ;
136- continue ;
137- }
138- }
139- }
140- Ok ( None ) => {
141- log:: error!( "Ingester at {} is not reachable" , & ingester. domain_name) ;
142- continue ;
143- }
144- Err ( err) => {
145- log:: error!(
146- "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}" ,
147- ingester. domain_name,
148- err
149- ) ;
150- return Err ( err) ;
151- }
116+ let path = RelativePathBuf :: from_iter ( [ stream_name, STREAM_ROOT_DIRECTORY ] ) ;
117+ let obs = CONFIG
118+ . storage ( )
119+ . get_object_store ( )
120+ . get_objects ( Some ( & path) )
121+ . await ?;
122+ let mut ingestion_size = 0u64 ;
123+ let mut storage_size = 0u64 ;
124+ let mut count = 0u64 ;
125+ for ob in obs {
126+ if let Ok ( stat) = serde_json:: from_slice :: < ObjectStoreFormat > ( & ob) {
127+ count += stat. stats . events ;
128+ ingestion_size += stat. stats . ingestion ;
129+ storage_size += stat. stats . storage ;
152130 }
153131 }
154132
155- Ok ( stats)
133+ let qs = QueriedStats :: new (
134+ "" ,
135+ Utc :: now ( ) ,
136+ IngestionStats :: new ( count, format ! ( "{} Bytes" , ingestion_size) , "json" ) ,
137+ StorageStats :: new ( format ! ( "{} Bytes" , storage_size) , "parquet" ) ,
138+ ) ;
139+
140+ Ok ( vec ! [ qs] )
156141}
157142
158143async fn send_stream_sync_request (
0 commit comments