diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index d91284a08..9d07e7e31 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -28,8 +28,8 @@ use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; -use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; +use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header::{self, HeaderMap}; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; @@ -237,38 +237,18 @@ pub async fn fetch_stats_from_ingestors( let mut deleted_storage_size = 0u64; let mut deleted_count = 0u64; for ob in obs { - let stream_metadata: serde_json::Value = + let stream_metadata: ObjectStoreFormat = serde_json::from_slice(&ob).expect("stream.json is valid json"); - let version = stream_metadata - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()); - let stats = stream_metadata.get("stats").unwrap(); - if matches!(version, Some("v4")) { - let current_stats = stats.get("current_stats").unwrap().clone(); - let lifetime_stats = stats.get("lifetime_stats").unwrap().clone(); - let deleted_stats = stats.get("deleted_stats").unwrap().clone(); - - count += current_stats.get("events").unwrap().as_u64().unwrap(); - ingestion_size += current_stats.get("ingestion").unwrap().as_u64().unwrap(); - storage_size += current_stats.get("storage").unwrap().as_u64().unwrap(); - lifetime_count += lifetime_stats.get("events").unwrap().as_u64().unwrap(); - lifetime_ingestion_size += lifetime_stats.get("ingestion").unwrap().as_u64().unwrap(); - lifetime_storage_size += lifetime_stats.get("storage").unwrap().as_u64().unwrap(); - deleted_count += deleted_stats.get("events").unwrap().as_u64().unwrap(); - deleted_ingestion_size += deleted_stats.get("ingestion").unwrap().as_u64().unwrap(); - deleted_storage_size += deleted_stats.get("storage").unwrap().as_u64().unwrap(); - } else { - count += stats.get("events").unwrap().as_u64().unwrap(); - ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); - storage_size += stats.get("storage").unwrap().as_u64().unwrap(); - lifetime_count += stats.get("events").unwrap().as_u64().unwrap(); - lifetime_ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); - lifetime_storage_size += stats.get("storage").unwrap().as_u64().unwrap(); - deleted_count += 0; - deleted_ingestion_size += 0; - deleted_storage_size += 0; - } + + count += stream_metadata.stats.current_stats.events; + ingestion_size += stream_metadata.stats.current_stats.ingestion; + storage_size += stream_metadata.stats.current_stats.storage; + lifetime_count += stream_metadata.stats.lifetime_stats.events; + lifetime_ingestion_size += stream_metadata.stats.lifetime_stats.ingestion; + lifetime_storage_size += stream_metadata.stats.lifetime_stats.storage; + deleted_count += stream_metadata.stats.deleted_stats.events; + deleted_ingestion_size += stream_metadata.stats.deleted_stats.ingestion; + deleted_storage_size += stream_metadata.stats.deleted_stats.storage; } let qs = QueriedStats::new( diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 49a7843a4..773856c2b 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; use once_cell::sync::Lazy; @@ -417,6 +417,45 @@ fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Sche Schema::try_merge(vec![schema, current_schema]).unwrap() } +///this function updates the data type of time partition field +/// from utf-8 to timestamp if it is not already timestamp +/// and updates the schema in the storage +/// required only when migrating from version 1.2.0 and below +/// this function will be removed in the future +pub async fn update_data_type_time_partition( + storage: &(impl ObjectStorage + ?Sized), + stream_name: &str, + schema: Schema, + meta: ObjectStoreFormat, +) -> anyhow::Result { + let mut schema = schema.clone(); + if meta.time_partition.is_some() { + let time_partition = meta.time_partition.unwrap(); + let time_partition_data_type = schema + .field_with_name(&time_partition) + .unwrap() + .data_type() + .clone(); + if time_partition_data_type != DataType::Timestamp(TimeUnit::Millisecond, None) { + let mut fields = schema + .fields() + .iter() + .filter(|field| *field.name() != time_partition) + .cloned() + .collect::>>(); + let time_partition_field = Arc::new(Field::new( + time_partition, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )); + fields.push(time_partition_field); + schema = Schema::new(fields); + storage.put_schema(stream_name, &schema).await?; + } + } + Ok(schema) +} + pub async fn load_stream_metadata_on_server_start( storage: &(impl ObjectStorage + ?Sized), stream_name: &str, @@ -428,7 +467,8 @@ pub async fn load_stream_metadata_on_server_start( meta = serde_json::from_slice(&serde_json::to_vec(&stream_metadata_value).unwrap()).unwrap(); } - + let schema = + update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?; let mut retention = meta.retention.clone(); let mut time_partition = meta.time_partition.clone(); let mut time_partition_limit = meta.time_partition_limit.clone(); @@ -546,6 +586,8 @@ pub mod error { pub enum LoadError { #[error("Error while loading from object storage: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error(" Error: {0}")] + Anyhow(#[from] anyhow::Error), } } }