@@ -28,8 +28,8 @@ use crate::option::CONFIG;
2828use crate :: metrics:: prom_utils:: Metrics ;
2929use crate :: stats:: Stats ;
3030use crate :: storage:: object_storage:: ingestor_metadata_path;
31- use crate :: storage:: PARSEABLE_ROOT_DIRECTORY ;
3231use crate :: storage:: { ObjectStorageError , STREAM_ROOT_DIRECTORY } ;
32+ use crate :: storage:: { ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY } ;
3333use actix_web:: http:: header:: { self , HeaderMap } ;
3434use actix_web:: { HttpRequest , Responder } ;
3535use bytes:: Bytes ;
@@ -237,38 +237,18 @@ pub async fn fetch_stats_from_ingestors(
237237 let mut deleted_storage_size = 0u64 ;
238238 let mut deleted_count = 0u64 ;
239239 for ob in obs {
240- let stream_metadata: serde_json :: Value =
240+ let stream_metadata: ObjectStoreFormat =
241241 serde_json:: from_slice ( & ob) . expect ( "stream.json is valid json" ) ;
242- let version = stream_metadata
243- . as_object ( )
244- . and_then ( |meta| meta. get ( "version" ) )
245- . and_then ( |version| version. as_str ( ) ) ;
246- let stats = stream_metadata. get ( "stats" ) . unwrap ( ) ;
247- if matches ! ( version, Some ( "v4" ) ) {
248- let current_stats = stats. get ( "current_stats" ) . unwrap ( ) . clone ( ) ;
249- let lifetime_stats = stats. get ( "lifetime_stats" ) . unwrap ( ) . clone ( ) ;
250- let deleted_stats = stats. get ( "deleted_stats" ) . unwrap ( ) . clone ( ) ;
251-
252- count += current_stats. get ( "events" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
253- ingestion_size += current_stats. get ( "ingestion" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
254- storage_size += current_stats. get ( "storage" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
255- lifetime_count += lifetime_stats. get ( "events" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
256- lifetime_ingestion_size += lifetime_stats. get ( "ingestion" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
257- lifetime_storage_size += lifetime_stats. get ( "storage" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
258- deleted_count += deleted_stats. get ( "events" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
259- deleted_ingestion_size += deleted_stats. get ( "ingestion" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
260- deleted_storage_size += deleted_stats. get ( "storage" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
261- } else {
262- count += stats. get ( "events" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
263- ingestion_size += stats. get ( "ingestion" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
264- storage_size += stats. get ( "storage" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
265- lifetime_count += stats. get ( "events" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
266- lifetime_ingestion_size += stats. get ( "ingestion" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
267- lifetime_storage_size += stats. get ( "storage" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
268- deleted_count += 0 ;
269- deleted_ingestion_size += 0 ;
270- deleted_storage_size += 0 ;
271- }
242+
243+ count += stream_metadata. stats . current_stats . events ;
244+ ingestion_size += stream_metadata. stats . current_stats . ingestion ;
245+ storage_size += stream_metadata. stats . current_stats . storage ;
246+ lifetime_count += stream_metadata. stats . lifetime_stats . events ;
247+ lifetime_ingestion_size += stream_metadata. stats . lifetime_stats . ingestion ;
248+ lifetime_storage_size += stream_metadata. stats . lifetime_stats . storage ;
249+ deleted_count += stream_metadata. stats . deleted_stats . events ;
250+ deleted_ingestion_size += stream_metadata. stats . deleted_stats . ingestion ;
251+ deleted_storage_size += stream_metadata. stats . deleted_stats . storage ;
272252 }
273253
274254 let qs = QueriedStats :: new (
0 commit comments