diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 5243fd106..e93f6cdd4 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -20,9 +20,12 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; use crate::handlers::http::base_path_without_preceding_slash; +use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::CONFIG; -use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats}; +use crate::stats::{ + event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, +}; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -103,9 +106,8 @@ pub async fn update_snapshot( change: manifest::File, ) -> Result<(), ObjectStorageError> { let mut meta = storage.get_object_store_format(stream_name).await?; - let meta_clone = meta.clone(); let manifests = &mut meta.snapshot.manifest_list; - let time_partition = &meta_clone.time_partition; + let time_partition = &meta.time_partition; let lower_bound = match time_partition { Some(time_partition) => { let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string()); @@ -174,12 +176,17 @@ pub async fn update_snapshot( } } - meta.snapshot.manifest_list = manifests.to_vec(); - storage.put_snapshot(stream_name, meta.snapshot).await?; if ch { if let Some(mut manifest) = storage.get_manifest(&path).await? { manifest.apply_change(change); storage.put_manifest(&path, manifest).await?; + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + meta.stats = stats; + } + meta.snapshot.manifest_list = manifests.to_vec(); + + storage.put_stream_manifest(stream_name, &meta).await?; } else { //instead of returning an error, create a new manifest (otherwise local to storage sync fails) //but don't update the snapshot @@ -189,7 +196,7 @@ pub async fn update_snapshot( storage.clone(), stream_name, false, - meta_clone, + meta, events_ingested, ingestion_size, storage_size, @@ -203,7 +210,7 @@ pub async fn update_snapshot( storage.clone(), stream_name, true, - meta_clone, + meta, events_ingested, ingestion_size, storage_size, @@ -217,7 +224,7 @@ pub async fn update_snapshot( storage.clone(), stream_name, true, - meta_clone, + meta, events_ingested, ingestion_size, storage_size, @@ -256,6 +263,30 @@ async fn create_manifest( files: vec![change], ..Manifest::default() }; + let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?; + if first_event_at.is_none() { + if let Some(first_event) = manifest.files.first() { + let time_partition = &meta.time_partition; + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string()); + lower_bound + } + None => { + let (lower_bound, _) = + get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; + first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); + if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } let mainfest_file_name = manifest_path("").to_string(); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); @@ -275,7 +306,12 @@ async fn create_manifest( }; manifests.push(new_snapshot_entry); meta.snapshot.manifest_list = manifests; - storage.put_snapshot(stream_name, meta.snapshot).await?; + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + meta.stats = stats; + } + meta.first_event_at = first_event_at; + storage.put_stream_manifest(stream_name, &meta).await?; } Ok(()) @@ -294,6 +330,8 @@ pub async fn remove_manifest_from_snapshot( let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + meta.first_event_at = None; + STREAM_INFO.set_first_event_at(stream_name, None)?; storage.put_snapshot(stream_name, meta.snapshot).await?; } match CONFIG.parseable.mode { @@ -313,39 +351,48 @@ pub async fn get_first_event( match CONFIG.parseable.mode { Mode::All | Mode::Ingest => { // get current snapshot - let mut meta = storage.get_object_store_format(stream_name).await?; - let manifests = &mut meta.snapshot.manifest_list; - let time_partition = meta.time_partition; - if manifests.is_empty() { - log::info!("No manifest found for stream {stream_name}"); - return Err(ObjectStorageError::Custom("No manifest found".to_string())); - } - let manifest = &manifests[0]; - let path = partition_path( - stream_name, - manifest.time_lower_bound, - manifest.time_upper_bound, - ); - let Some(manifest) = storage.get_manifest(&path).await? else { - return Err(ObjectStorageError::UnhandledError( - "Manifest found in snapshot but not in object-storage" - .to_string() - .into(), - )); - }; - if let Some(first_event) = manifest.files.first() { - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(first_event, time_partition); - lower_bound - } - None => { - let (lower_bound, _) = - get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } + let stream_first_event = STREAM_INFO.get_first_event(stream_name)?; + if stream_first_event.is_some() { + first_event_at = stream_first_event.unwrap(); + } else { + let mut meta = storage.get_object_store_format(stream_name).await?; + let meta_clone = meta.clone(); + let manifests = meta_clone.snapshot.manifest_list; + let time_partition = meta_clone.time_partition; + if manifests.is_empty() { + log::info!("No manifest found for stream {stream_name}"); + return Err(ObjectStorageError::Custom("No manifest found".to_string())); + } + let manifest = &manifests[0]; + let path = partition_path( + stream_name, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); + let Some(manifest) = storage.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); }; - first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + if let Some(first_event) = manifest.files.first() { + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(first_event, time_partition); + lower_bound + } + None => { + let (lower_bound, _) = + get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; + first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + meta.first_event_at = Some(first_event_at.clone()); + storage.put_stream_manifest(stream_name, &meta).await?; + STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?; + } } } Mode::Query => { diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index ed697af00..82cd9e3aa 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -221,6 +221,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { } } DataType::Timestamp(_, _) => value.is_string() || value.is_number(), - _ => unreachable!(), + _ => { + log::error!("Unsupported datatype {:?}, value {:?}", data_type, value); + unreachable!() + } } } diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index c823e801d..1476e1aad 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -31,6 +31,7 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::localcache::CacheError; +use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorageError}; @@ -62,7 +63,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { - create_stream_if_not_exists(&stream_name).await?; + create_stream_if_not_exists(&stream_name, true).await?; let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); let (rb, is_first) = { @@ -115,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { - let glob_storage = CONFIG.storage().get_object_store(); - let object_store_format = glob_storage - .get_object_store_format(&stream_name) - .await - .map_err(|_| PostError::StreamNotFound(stream_name.clone()))?; - - let time_partition = object_store_format.time_partition; - let time_partition_limit = object_store_format.time_partition_limit; - let static_schema_flag = object_store_format.static_schema_flag; - let custom_partition = object_store_format.custom_partition; + let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; + let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?; + let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?; let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); @@ -414,7 +408,10 @@ fn into_event_batch( } // Check if the stream exists and create a new stream if doesn't exist -pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostError> { +pub async fn create_stream_if_not_exists( + stream_name: &str, + internal_stream: bool, +) -> Result<(), PostError> { if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } @@ -427,6 +424,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr "", "", Arc::new(Schema::empty()), + internal_stream, ) .await?; } @@ -440,7 +438,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr }) { log::error!("Stream {} not found", stream_name); return Err(PostError::Invalid(anyhow::anyhow!( - "Stream {} not found. Has it been created?", + "Stream `{}` not found. Please create it using the Query server.", stream_name ))); } @@ -472,6 +470,8 @@ pub enum PostError { Invalid(#[from] anyhow::Error), #[error("{0}")] CreateStream(#[from] CreateStreamError), + #[error("Error: {0}")] + MetadataStreamError(#[from] MetadataError), #[allow(unused)] #[error("Error: {0}")] CustomError(String), @@ -498,6 +498,7 @@ impl actix_web::ResponseError for PostError { StatusCode::BAD_REQUEST } PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::MetadataStreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 6fe7e78ab..5045dcf8b 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -19,9 +19,7 @@ use self::error::{CreateStreamError, StreamError}; use super::base_path_without_preceding_slash; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; -use super::cluster::{ - fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, INTERNAL_STREAM_NAME, -}; +use super::cluster::{fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors}; use crate::alerts::Alerts; use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, @@ -105,28 +103,8 @@ pub async fn retention_cleanup( let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let check = storage - .list_streams() - .await? - .iter() - .map(|stream| stream.name.clone()) - .contains(&stream_name); - - if !check { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - metadata::STREAM_INFO - .upsert_stream_info( - &*storage, - LogStream { - name: stream_name.clone().to_owned(), - }, - ) - .await - .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); } let date_list: Vec = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; @@ -270,6 +248,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let objectstore = CONFIG.storage().get_object_store(); - - if !objectstore.stream_exists(&stream_name).await? { + if !STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name.to_string())); } + let retention = STREAM_INFO.get_retention(&stream_name); - let retention = CONFIG - .storage() - .get_object_store() - .get_retention(&stream_name) - .await?; - - Ok((web::Json(retention), StatusCode::OK)) + match retention { + Ok(retention) => { + if let Some(retention) = retention { + Ok((web::Json(retention), StatusCode::OK)) + } else { + Ok((web::Json(Retention::default()), StatusCode::OK)) + } + } + Err(err) => Err(StreamError::from(err)), + } } pub async fn put_retention( @@ -468,6 +449,10 @@ pub async fn put_retention( .put_retention(&stream_name, &retention) .await?; + metadata::STREAM_INFO + .set_retention(&stream_name, retention) + .expect("retention set on existing stream"); + Ok(( format!("set retention configuration for log stream {stream_name}"), StatusCode::OK, @@ -554,7 +539,7 @@ pub async fn put_enable_cache( } } let enable_cache = body.into_inner(); - let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; stream_metadata.cache_enabled = enable_cache; storage .put_stream_manifest(&stream_name, &stream_metadata) @@ -779,15 +764,16 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: &str, schema: Arc, + internal_stream: bool, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if stream_name.ne(INTERNAL_STREAM_NAME) { + if !internal_stream { validator::stream_name(&stream_name)?; } - // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage + + match storage .create_stream( &stream_name, time_partition, @@ -798,36 +784,31 @@ pub async fn create_stream( ) .await { - return Err(CreateStreamError::Storage { stream_name, err }); - } - - let stream_meta = CONFIG - .storage() - .get_object_store() - .get_stream_metadata(&stream_name) - .await; - let stream_meta = stream_meta.unwrap(); - let created_at = stream_meta.created_at; - let mut static_schema: HashMap> = HashMap::new(); - - for (field_name, field) in schema - .fields() - .iter() - .map(|field| (field.name().to_string(), field.clone())) - { - static_schema.insert(field_name, field); - } + Ok(created_at) => { + let mut static_schema: HashMap> = HashMap::new(); - metadata::STREAM_INFO.add_stream( - stream_name.to_string(), - created_at, - time_partition.to_string(), - time_partition_limit.to_string(), - custom_partition.to_string(), - static_schema_flag.to_string(), - static_schema, - ); + for (field_name, field) in schema + .fields() + .iter() + .map(|field| (field.name().to_string(), field.clone())) + { + static_schema.insert(field_name, field); + } + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + time_partition_limit.to_string(), + custom_partition.to_string(), + static_schema_flag.to_string(), + static_schema, + ); + } + Err(err) => { + return Err(CreateStreamError::Storage { stream_name, err }); + } + } Ok(()) } @@ -896,7 +877,7 @@ pub mod error { #[derive(Debug, thiserror::Error)] pub enum CreateStreamError { - #[error("Stream name validation failed due to {0}")] + #[error("Stream name validation failed: {0}")] StreamNameValidation(#[from] StreamNameValidationError), #[error("failed to create log stream {stream_name} due to err: {err}")] Storage { diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 24e978dda..b07cc50af 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -15,14 +15,12 @@ * along with this program. If not, see . * */ - use crate::analytics; use crate::banner; use crate::handlers::airplane; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; -use crate::metadata; use crate::metrics; use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; @@ -52,6 +50,7 @@ use actix_web_prometheus::PrometheusMetrics; use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; +use bytes::Bytes; use itertools::Itertools; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; @@ -110,17 +109,15 @@ impl ParseableServer for IngestServer { self.validate()?; // check for querier state. Is it there, or was it there in the past - self.check_querier_state().await?; + let parseable_json = self.check_querier_state().await?; // to get the .parseable.json file in staging self.validate_credentials().await?; - - let metadata = storage::resolve_parseable_metadata().await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); // set the info in the global metadata metadata.set_global(); - self.initialize().await } @@ -219,7 +216,7 @@ impl IngestServer { // create the ingestor metadata and put the .ingestor.json file in the object store async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { - migrate_ingester_metadata().await?; + let storage_ingestor_metadata = migrate_ingester_metadata().await?; let store = CONFIG.storage().get_object_store(); // find the meta file in staging if not generate new metadata @@ -229,18 +226,10 @@ impl IngestServer { let path = ingestor_metadata_path(None); // we are considering that we can always get from object store - if let Ok(store_meta) = store.get_object(&path).await { - log::info!("Ingestor Metadata is present. Checking for updates"); - let mut store_data = serde_json::from_slice::(&store_meta) - .map_err(|_| anyhow!("IngestorMetadata was not parseable as valid json"))?; + if storage_ingestor_metadata.is_some() { + let mut store_data = storage_ingestor_metadata.unwrap(); if store_data.domain_name != INGESTOR_META.domain_name { - log::info!("Ingestor Metadata update needed."); - log::info!( - "Old Domain Name: {}, New Domain Name: {}", - store_data.domain_name, - INGESTOR_META.domain_name - ); store_data .domain_name .clone_from(&INGESTOR_META.domain_name); @@ -256,20 +245,20 @@ impl IngestServer { .await .map_err(|err| anyhow!(err)); } - } - - let resource = serde_json::to_string(&resource)? - .try_into_bytes() - .map_err(|err| anyhow!(err))?; + } else { + let resource = serde_json::to_string(&resource)? + .try_into_bytes() + .map_err(|err| anyhow!(err))?; - store.put_object(&path, resource).await?; + store.put_object(&path, resource).await?; + } Ok(()) } // check for querier state. Is it there, or was it there in the past // this should happen before the set the ingestor metadata - async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> { + async fn check_querier_state(&self) -> anyhow::Result, ObjectStorageError> { // how do we check for querier state? // based on the work flow of the system, the querier will always need to start first // i.e the querier will create the `.parseable.json` file @@ -277,8 +266,9 @@ impl IngestServer { let store = CONFIG.storage().get_object_store(); let path = parseable_json_path(); - match store.get_object(&path).await { - Ok(_) => Ok(()), + let parseable_json = store.get_object(&path).await; + match parseable_json { + Ok(_) => Ok(Some(parseable_json.unwrap())), Err(_) => Err(ObjectStorageError::Custom( "Query Server has not been started yet. Please start the querier server first." .to_string(), @@ -297,10 +287,8 @@ impl IngestServer { ) .await? .iter() - // this unwrap will most definateley shoot me in the foot later .map(|x| serde_json::from_slice::(x).unwrap_or_default()) .collect_vec(); - if !ingestor_metadata.is_empty() { let check = ingestor_metadata[0].token.clone(); @@ -312,7 +300,6 @@ impl IngestServer { let token = format!("Basic {}", token); if check != token { - log::error!("Credentials do not match with other ingestors. Please check your credentials and try again."); return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again.")); } } @@ -333,13 +320,6 @@ impl IngestServer { migration::run_migration(&CONFIG).await?; - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = metadata::STREAM_INFO.load(&*storage).await { - log::warn!("could not populate local metadata. {:?}", err); - } - - metrics::fetch_stats_from_storage().await; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 4b69ccf01..c1323f8e2 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -25,7 +25,7 @@ use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; +use crate::{analytics, banner, metrics, migration, rbac, storage}; use actix_web::web; use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; @@ -91,9 +91,9 @@ impl ParseableServer for QueryServer { async fn init(&self) -> anyhow::Result<()> { self.validate()?; migration::run_file_migration(&CONFIG).await?; - CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG).await?; - let metadata = storage::resolve_parseable_metadata().await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; banner::print(&CONFIG, &metadata).await; // initialize the rbac map rbac::map::init(&metadata); @@ -174,16 +174,8 @@ impl QueryServer { migration::run_migration(&CONFIG).await?; - let storage = CONFIG.storage().get_object_store(); - if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { - log::warn!("could not populate local metadata. {:?}", e); - } - FILTERS.load().await?; DASHBOARDS.load().await?; - - // load data from stats back to prometheus metrics - metrics::fetch_stats_from_storage().await; // track all parquet files already in the data directory storage::retention::load_retention_from_global(); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index dd809de6d..bcb56b6ab 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -29,7 +29,6 @@ use crate::handlers::http::users::filters; use crate::handlers::http::API_BASE_PATH; use crate::handlers::http::API_VERSION; use crate::localcache::LocalCacheManager; -use crate::metadata; use crate::metrics; use crate::migration; use crate::rbac; @@ -115,9 +114,9 @@ impl ParseableServer for Server { async fn init(&self) -> anyhow::Result<()> { self.validate()?; migration::run_file_migration(&CONFIG).await?; - CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG).await?; - let metadata = storage::resolve_parseable_metadata().await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); @@ -499,15 +498,9 @@ impl Server { migration::run_migration(&CONFIG).await?; - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = metadata::STREAM_INFO.load(&*storage).await { - log::warn!("could not populate local metadata. {:?}", err); - } - FILTERS.load().await?; DASHBOARDS.load().await?; - metrics::fetch_stats_from_storage().await; storage::retention::load_retention_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); diff --git a/server/src/handlers/http/otel.rs b/server/src/handlers/http/otel.rs index 24a5d3547..fdb5d6036 100644 --- a/server/src/handlers/http/otel.rs +++ b/server/src/handlers/http/otel.rs @@ -41,13 +41,13 @@ fn collect_json_from_any_value( if value.int_val.is_some() { value_json.insert( key.to_string(), - Value::Number(serde_json::Number::from(value.int_val.unwrap())), + Value::String(value.int_val.as_ref().unwrap().to_owned()), ); } if value.double_val.is_some() { value_json.insert( key.to_string(), - Value::Number(serde_json::Number::from_f64(value.double_val.unwrap()).unwrap()), + Value::String(value.double_val.as_ref().unwrap().to_owned()), ); } @@ -130,7 +130,6 @@ fn value_to_string(value: serde_json::Value) -> String { pub fn flatten_otel_logs(body: &Bytes) -> Vec> { let mut vec_otel_json: Vec> = Vec::new(); let body_str = std::str::from_utf8(body).unwrap(); - let message: LogsData = serde_json::from_str(body_str).unwrap(); for records in message.resource_logs.iter() { for record in records.iter() { diff --git a/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs b/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs index 65a23ac6b..38b20b687 100644 --- a/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs +++ b/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs @@ -37,9 +37,9 @@ #[serde(rename = "boolValue")] pub bool_val: Option, #[serde(rename = "intValue")] - pub int_val: Option, + pub int_val: Option, #[serde(rename = "doubleValue")] - pub double_val: Option, + pub double_val: Option, #[serde(rename = "arrayValue")] pub array_val: Option, #[serde(rename = "keyVauleList")] diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 7d1b671ea..d29527c53 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -30,6 +30,7 @@ use crate::metrics::{ EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; +use crate::storage::retention::Retention; use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -45,6 +46,7 @@ pub struct StreamInfo(RwLock>); pub struct LogStreamMetadata { pub schema: HashMap>, pub alerts: Alerts, + pub retention: Option, pub cache_enabled: bool, pub created_at: String, pub first_event_at: Option, @@ -97,6 +99,13 @@ impl StreamInfo { .map(|metadata| metadata.cache_enabled) } + pub fn get_first_event(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.first_event_at.clone()) + } + pub fn get_time_partition(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -104,6 +113,16 @@ impl StreamInfo { .map(|metadata| metadata.time_partition.clone()) } + pub fn get_time_partition_limit( + &self, + stream_name: &str, + ) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.time_partition_limit.clone()) + } + pub fn get_custom_partition(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -121,6 +140,13 @@ impl StreamInfo { .map(|metadata| metadata.static_schema_flag.clone()) } + pub fn get_retention(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.retention.clone()) + } + pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); let stream = map @@ -159,6 +185,19 @@ impl StreamInfo { }) } + pub fn set_retention( + &self, + stream_name: &str, + retention: Retention, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.retention = Some(retention); + }) + } + pub fn set_first_event_at( &self, stream_name: &str, @@ -251,28 +290,16 @@ impl StreamInfo { map.remove(stream_name); } - pub async fn load(&self, storage: &(impl ObjectStorage + ?Sized)) -> Result<(), LoadError> { - // When loading streams this funtion will assume list_streams only returns valid streams. - // a valid stream would have a .schema file. - // .schema file could be empty in that case it will be treated as an uninitialized stream. - // return error in case of an error from object storage itself. - - for stream in storage.list_streams().await? { - self.upsert_stream_info(storage, stream).await?; - } - Ok(()) - } - pub async fn upsert_stream_info( &self, storage: &(impl ObjectStorage + ?Sized), stream: LogStream, ) -> Result<(), LoadError> { let alerts = storage.get_alerts(&stream.name).await?; - let schema = storage.get_schema_on_server_start(&stream.name).await?; - let meta = storage.get_stream_metadata(&stream.name).await?; - let meta_clone = meta.clone(); - let stream_name = stream.name.clone(); + + let schema = storage.upsert_schema_to_storage(&stream.name).await?; + let meta = storage.upsert_stream_metadata(&stream.name).await?; + let retention = meta.retention; let schema = update_schema_from_staging(&stream.name, schema); let schema = HashMap::from_iter( schema @@ -284,6 +311,7 @@ impl StreamInfo { let metadata = LogStreamMetadata { schema, alerts, + retention, cache_enabled: meta.cache_enabled, created_at: meta.created_at, first_event_at: meta.first_event_at, @@ -296,30 +324,9 @@ impl StreamInfo { let mut map = self.write().expect(LOCK_EXPECT); map.insert(stream.name, metadata); - Self::load_daily_metrics(meta_clone, &stream_name); - Ok(()) } - fn load_daily_metrics(meta: ObjectStoreFormat, stream_name: &str) { - let manifests = meta.snapshot.manifest_list; - for manifest in manifests { - let manifest_date = manifest.time_lower_bound.date_naive().to_string(); - let events_ingested = manifest.events_ingested; - let ingestion_size = manifest.ingestion_size; - let storage_size = manifest.storage_size; - EVENTS_INGESTED_DATE - .with_label_values(&[stream_name, "json", &manifest_date]) - .set(events_ingested as i64); - EVENTS_INGESTED_SIZE_DATE - .with_label_values(&[stream_name, "json", &manifest_date]) - .set(ingestion_size as i64); - EVENTS_STORAGE_SIZE_DATE - .with_label_values(&["data", stream_name, "parquet", &manifest_date]) - .set(storage_size as i64); - } - } - pub fn list_streams(&self) -> Vec { self.read() .expect(LOCK_EXPECT) @@ -368,6 +375,60 @@ fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Sche Schema::try_merge(vec![schema, current_schema]).unwrap() } +pub async fn load_stream_metadata_on_server_start( + storage: &(impl ObjectStorage + ?Sized), + stream_name: &str, + schema: Schema, + meta: &ObjectStoreFormat, +) -> Result<(), LoadError> { + let alerts = storage.get_alerts(stream_name).await?; + let schema = update_schema_from_staging(stream_name, schema); + let schema = HashMap::from_iter( + schema + .fields + .iter() + .map(|v| (v.name().to_owned(), v.clone())), + ); + + let metadata = LogStreamMetadata { + schema, + alerts, + retention: meta.retention.clone(), + cache_enabled: meta.cache_enabled, + created_at: meta.created_at.clone(), + first_event_at: meta.first_event_at.clone(), + time_partition: meta.time_partition.clone(), + time_partition_limit: meta.time_partition_limit.clone(), + custom_partition: meta.custom_partition.clone(), + static_schema_flag: meta.static_schema_flag.clone(), + }; + + let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); + + map.insert(stream_name.to_string(), metadata); + load_daily_metrics(meta, stream_name); + Ok(()) +} + +fn load_daily_metrics(meta: &ObjectStoreFormat, stream_name: &str) { + let manifests = &meta.snapshot.manifest_list; + for manifest in manifests { + let manifest_date = manifest.time_lower_bound.date_naive().to_string(); + let events_ingested = manifest.events_ingested; + let ingestion_size = manifest.ingestion_size; + let storage_size = manifest.storage_size; + EVENTS_INGESTED_DATE + .with_label_values(&[stream_name, "json", &manifest_date]) + .set(events_ingested as i64); + EVENTS_INGESTED_SIZE_DATE + .with_label_values(&[stream_name, "json", &manifest_date]) + .set(ingestion_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", stream_name, "parquet", &manifest_date]) + .set(storage_size as i64); + } +} + pub mod error { pub mod stream_info { use crate::storage::ObjectStorageError; diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index dafe34629..c034e580e 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -19,7 +19,7 @@ pub mod prom_utils; pub mod storage; -use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, option::CONFIG}; +use crate::{handlers::http::metrics_path, stats::FullStats}; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; @@ -257,42 +257,33 @@ fn prom_process_metrics(metrics: &PrometheusMetrics) { #[cfg(not(target_os = "linux"))] fn prom_process_metrics(_metrics: &PrometheusMetrics) {} -pub async fn fetch_stats_from_storage() { - for stream_name in STREAM_INFO.list_streams() { - let stats = CONFIG - .storage() - .get_object_store() - .get_stats(&stream_name) - .await - .expect("stats are loaded properly"); +pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) { + EVENTS_INGESTED + .with_label_values(&[stream_name, "json"]) + .set(stats.current_stats.events as i64); + EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, "json"]) + .set(stats.current_stats.ingestion as i64); + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .set(stats.current_stats.storage as i64); + EVENTS_DELETED + .with_label_values(&[stream_name, "json"]) + .set(stats.deleted_stats.events as i64); + EVENTS_DELETED_SIZE + .with_label_values(&[stream_name, "json"]) + .set(stats.deleted_stats.ingestion as i64); + DELETED_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .set(stats.deleted_stats.storage as i64); - EVENTS_INGESTED - .with_label_values(&[&stream_name, "json"]) - .set(stats.current_stats.events as i64); - EVENTS_INGESTED_SIZE - .with_label_values(&[&stream_name, "json"]) - .set(stats.current_stats.ingestion as i64); - STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .set(stats.current_stats.storage as i64); - EVENTS_DELETED - .with_label_values(&[&stream_name, "json"]) - .set(stats.deleted_stats.events as i64); - EVENTS_DELETED_SIZE - .with_label_values(&[&stream_name, "json"]) - .set(stats.deleted_stats.ingestion as i64); - DELETED_EVENTS_STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .set(stats.deleted_stats.storage as i64); - - LIFETIME_EVENTS_INGESTED - .with_label_values(&[&stream_name, "json"]) - .set(stats.lifetime_stats.events as i64); - LIFETIME_EVENTS_INGESTED_SIZE - .with_label_values(&[&stream_name, "json"]) - .set(stats.lifetime_stats.ingestion as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .set(stats.lifetime_stats.storage as i64); - } + LIFETIME_EVENTS_INGESTED + .with_label_values(&[stream_name, "json"]) + .set(stats.lifetime_stats.events as i64); + LIFETIME_EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, "json"]) + .set(stats.lifetime_stats.ingestion as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .set(stats.lifetime_stats.storage as i64); } diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 0cd513929..e97efe6d2 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -172,10 +172,10 @@ impl Metrics { "parseable_lifetime_events_ingested_size" => { prom_dress.parseable_lifetime_events_ingested_size += val } - "parseable_deleted_events_ingested" => { + "parseable_events_deleted" => { prom_dress.parseable_deleted_events_ingested += val } - "parseable_deleted_events_ingested_size" => { + "parseable_events_deleted_size" => { prom_dress.parseable_deleted_events_ingested_size += val } "parseable_staging_files" => prom_dress.parseable_staging_files += val, diff --git a/server/src/migration.rs b/server/src/migration.rs index 3a682b861..8d459df36 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -24,23 +24,34 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ + metadata::load_stream_metadata_on_server_start, + metrics::fetch_stats_from_storage, option::Config, storage::{ object_storage::{parseable_json_path, stream_json_path}, - ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, + ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; +use arrow_schema::Schema; use bytes::Bytes; use itertools::Itertools; use relative_path::RelativePathBuf; use serde::Serialize; +use serde_json::Value; /// Migrate the metdata from v1 or v2 to v3 /// This is a one time migration -pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { +pub async fn run_metadata_migration( + config: &Config, + parseable_json: &Option, +) -> anyhow::Result<()> { let object_store = config.storage().get_object_store(); - let storage_metadata = get_storage_metadata(&*object_store).await?; + let mut storage_metadata: Option = None; + if parseable_json.is_some() { + storage_metadata = serde_json::from_slice(parseable_json.as_ref().unwrap()) + .expect("parseable config is valid json"); + } let staging_metadata = get_staging_metadata(config)?; fn get_version(metadata: &serde_json::Value) -> Option<&str> { @@ -114,9 +125,14 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: if stream_metadata.is_empty() { return Ok(()); } - let stream_metadata: serde_json::Value = + let mut stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); + let schema_path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); + let schema = storage.get_object(&schema_path).await?; + + let mut arrow_schema: Schema = Schema::empty(); + let version = stream_metadata .as_object() .and_then(|meta| meta.get("version")) @@ -124,40 +140,55 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: match version { Some("v1") => { - let new_stream_metadata = stream_metadata_migration::v1_v4(stream_metadata); + stream_metadata = stream_metadata_migration::v1_v4(stream_metadata); storage - .put_object(&path, to_bytes(&new_stream_metadata)) + .put_object(&path, to_bytes(&stream_metadata)) .await?; - - let schema_path = - RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema).ok(); - let map = schema_migration::v1_v4(schema)?; - storage.put_object(&schema_path, to_bytes(&map)).await?; + arrow_schema = schema_migration::v1_v4(schema)?; + storage + .put_object(&schema_path, to_bytes(&arrow_schema)) + .await?; } Some("v2") => { - let new_stream_metadata = stream_metadata_migration::v2_v4(stream_metadata); + stream_metadata = stream_metadata_migration::v2_v4(stream_metadata); storage - .put_object(&path, to_bytes(&new_stream_metadata)) + .put_object(&path, to_bytes(&stream_metadata)) .await?; - let schema_path = - RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema)?; - let map = schema_migration::v2_v4(schema)?; - storage.put_object(&schema_path, to_bytes(&map)).await?; + arrow_schema = schema_migration::v2_v4(schema)?; + storage + .put_object(&schema_path, to_bytes(&arrow_schema)) + .await?; } Some("v3") => { - let new_stream_metadata = stream_metadata_migration::v3_v4(stream_metadata); + stream_metadata = stream_metadata_migration::v3_v4(stream_metadata); storage - .put_object(&path, to_bytes(&new_stream_metadata)) + .put_object(&path, to_bytes(&stream_metadata)) .await?; } _ => (), } + if arrow_schema.fields().is_empty() { + arrow_schema = serde_json::from_slice(&schema)?; + } + + //load stream metadata from storage + let meta: ObjectStoreFormat = + serde_json::from_slice(&serde_json::to_vec(&stream_metadata).unwrap()).unwrap(); + if let Err(err) = + load_stream_metadata_on_server_start(storage, stream, arrow_schema, &meta).await + { + log::error!("could not populate local metadata. {:?}", err); + return Err(err.into()); + } + + //load stats from storage + let stats = meta.stats; + fetch_stats_from_storage(stream, stats).await; + Ok(()) } @@ -169,8 +200,7 @@ pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { } pub fn get_staging_metadata(config: &Config) -> anyhow::Result> { - let path = parseable_json_path().to_path(config.staging_dir()); - + let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.staging_dir()); let bytes = match std::fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { @@ -183,24 +213,6 @@ pub fn get_staging_metadata(config: &Config) -> anyhow::Result anyhow::Result> { - let path = parseable_json_path(); - match storage.get_object(&path).await { - Ok(bytes) => Ok(Some( - serde_json::from_slice(&bytes).expect("parseable config is valid json"), - )), - Err(err) => { - if matches!(err, ObjectStorageError::NoSuchKey(_)) { - Ok(None) - } else { - Err(err.into()) - } - } - } -} - pub async fn put_remote_metadata( storage: &dyn ObjectStorage, metadata: &serde_json::Value, @@ -244,8 +256,6 @@ async fn run_meta_file_migration( object_store: &Arc, old_meta_file_path: RelativePathBuf, ) -> anyhow::Result<()> { - log::info!("Migrating metadata files to new location"); - // get the list of all meta files let mut meta_files = object_store.get_ingestor_meta_file_paths().await?; meta_files.push(old_meta_file_path); diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index bbe1c1548..10bfa940c 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -148,13 +148,12 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } -pub async fn migrate_ingester_metadata() -> anyhow::Result<()> { +pub async fn migrate_ingester_metadata() -> anyhow::Result> { let imp = ingestor_metadata_path(None); let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await { Ok(bytes) => bytes, Err(_) => { - log::debug!("No metadata found for ingester. So migration is not required"); - return Ok(()); + return Ok(None); } }; let mut json = serde_json::from_slice::(&bytes)?; @@ -182,5 +181,5 @@ pub async fn migrate_ingester_metadata() -> anyhow::Result<()> { .put_object(&imp, bytes) .await?; - Ok(()) + Ok(Some(resource)) } diff --git a/server/src/option.rs b/server/src/option.rs index e607c2062..1e69cadf4 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -16,6 +16,10 @@ * */ +use crate::cli::Cli; +use crate::storage::object_storage::parseable_json_path; +use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; +use bytes::Bytes; use clap::error::ErrorKind; use clap::{command, Args, Command, FromArgMatches}; use core::fmt; @@ -24,10 +28,6 @@ use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use std::env; use std::path::PathBuf; use std::sync::Arc; - -use crate::cli::Cli; -use crate::storage::object_storage::parseable_json_path; -use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB pub const JOIN_COMMUNITY: &str = "Join us on Parseable Slack community for questions : https://logg.ing/community"; @@ -111,11 +111,14 @@ Log Lake for the cloud-native world // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc - pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> { + pub async fn validate_storage(&self) -> Result, ObjectStorageError> { let obj_store = self.storage.get_object_store(); let rel_path = parseable_json_path(); - - let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok(); + let mut has_parseable_json = false; + let parseable_json_result = obj_store.get_object(&rel_path).await; + if parseable_json_result.is_ok() { + has_parseable_json = true; + } // Lists all the directories in the root of the bucket/directory // can be a stream (if it contains .stream.json file) or not @@ -125,9 +128,11 @@ Log Lake for the cloud-native world }; let has_streams = obj_store.list_streams().await.is_ok(); - - if has_streams || !has_dirs && !has_parseable_json { - return Ok(()); + if !has_dirs && !has_parseable_json { + return Ok(None); + } + if has_streams { + return Ok(Some(parseable_json_result.unwrap())); } if self.get_storage_mode_string() == "Local drive" { diff --git a/server/src/query.rs b/server/src/query.rs index 7239467c0..22e9bb880 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -43,6 +43,7 @@ use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; use crate::event; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::{ObjectStorageProvider, StorageDir}; @@ -106,9 +107,7 @@ impl Query { &self, stream_name: String, ) -> Result<(Vec, Vec), ExecuteError> { - let store = CONFIG.storage().get_object_store(); - let object_store_format = store.get_object_store_format(&stream_name).await?; - let time_partition = object_store_format.time_partition; + let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(&time_partition)) @@ -389,7 +388,7 @@ pub fn flatten_objects_for_count(objects: Vec) -> Vec { } pub mod error { - use crate::storage::ObjectStorageError; + use crate::{metadata::error::stream_info::MetadataError, storage::ObjectStorageError}; use datafusion::error::DataFusionError; #[derive(Debug, thiserror::Error)] @@ -398,6 +397,8 @@ pub mod error { ObjectStorage(#[from] ObjectStorageError), #[error("Query Execution failed due to error in datafusion: {0}")] Datafusion(#[from] DataFusionError), + #[error("Query Execution failed due to error in fetching metadata: {0}")] + Metadata(#[from] MetadataError), } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 6e11fdf6b..a5b099f21 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -36,18 +36,18 @@ use crate::{ metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, - stats::{self, FullStats, Stats}, + stats::FullStats, }; use actix_web_prometheus::PrometheusMetrics; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; +use chrono::Local; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use serde_json::Value; use std::{ collections::HashMap, @@ -132,11 +132,12 @@ pub trait ObjectStorage: Sync + 'static { custom_partition: &str, static_schema_flag: &str, schema: Arc, - ) -> Result<(), ObjectStorageError> { + ) -> Result { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; + format.created_at = Local::now().to_rfc3339(); if time_partition.is_empty() { format.time_partition = None; } else { @@ -164,7 +165,7 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&stream_json_path(stream_name), format_json) .await?; - Ok(()) + Ok(format.created_at) } async fn update_time_partition_limit_in_stream( @@ -252,7 +253,7 @@ pub trait ObjectStorage: Sync + 'static { .await } - async fn get_schema_on_server_start( + async fn upsert_schema_to_storage( &self, stream_name: &str, ) -> Result { @@ -262,8 +263,7 @@ pub trait ObjectStorage: Sync + 'static { let schema_path = schema_path(stream_name); let byte_data = match self.get_object(&schema_path).await { Ok(bytes) => bytes, - Err(err) => { - log::info!("{:?}", err); + Err(_) => { // base schema path let schema_path = RelativePathBuf::from_iter([ stream_name, @@ -303,7 +303,7 @@ pub trait ObjectStorage: Sync + 'static { } } - async fn get_stream_metadata( + async fn upsert_stream_metadata( &self, stream_name: &str, ) -> Result { @@ -344,52 +344,6 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(manifest)).await } - /// for future use - #[allow(dead_code)] - async fn get_stats_for_first_time( - &self, - stream_name: &str, - ) -> Result { - let path = RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]); - let stream_metadata = self.get_object(&path).await?; - let stream_metadata: Value = - serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - let stats = &stream_metadata["stats"]; - - let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); - - Ok(stats) - } - - async fn get_stats(&self, stream_name: &str) -> Result { - let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; - let stream_metadata: Value = - serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - - let stats = &stream_metadata["stats"]; - - let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); - - Ok(stats) - } - - async fn get_retention(&self, stream_name: &str) -> Result { - let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; - let stream_metadata: Value = - serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - - let retention = stream_metadata - .as_object() - .expect("is object") - .get("retention") - .cloned(); - if let Some(retention) = retention { - Ok(serde_json::from_value(retention)?) - } else { - Ok(Retention::default()) - } - } - async fn get_metadata(&self) -> Result, ObjectStorageError> { let parseable_metadata: Option = match self.get_object(&parseable_json_path()).await { @@ -408,15 +362,6 @@ pub trait ObjectStorage: Sync + 'static { Ok(parseable_metadata) } - async fn stream_exists(&self, stream_name: &str) -> Result { - let res = self.get_object(&stream_json_path(stream_name)).await; - match res { - Ok(_) => Ok(true), - Err(ObjectStorageError::NoSuchKey(_)) => Ok(false), - Err(e) => Err(e), - } - } - // get the manifest info async fn get_manifest( &self, @@ -461,7 +406,7 @@ pub trait ObjectStorage: Sync + 'static { stream: &str, snapshot: Snapshot, ) -> Result<(), ObjectStorageError> { - let mut stream_meta = self.get_stream_metadata(stream).await?; + let mut stream_meta = self.upsert_stream_metadata(stream).await?; stream_meta.snapshot = snapshot; self.put_object(&stream_json_path(stream), to_bytes(&stream_meta)) .await @@ -543,12 +488,6 @@ pub trait ObjectStorage: Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; - let stats = stats::get_current_stats(stream, "json"); - if let Some(stats) = stats { - if let Err(e) = self.put_stats(stream, &stats).await { - log::warn!("Error updating stats to objectstore due to error [{}]", e); - } - } if cache_enabled && cache_manager.is_some() { cache_updates .entry(stream) diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 7e697b3ae..56c0efa0c 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -29,7 +29,6 @@ use derive_more::Display; use once_cell::sync::Lazy; use crate::metadata::STREAM_INFO; -use crate::option::CONFIG; type SchedulerHandle = thread::JoinHandle<()>; @@ -54,15 +53,14 @@ pub fn init_scheduler() { let func = move || async { //get retention every day at 12 am for stream in STREAM_INFO.list_streams() { - let res = CONFIG - .storage() - .get_object_store() - .get_retention(&stream) - .await; + let retention = STREAM_INFO.get_retention(&stream); - match res { + match retention { Ok(config) => { - for Task { action, days, .. } in config.tasks.into_iter() { + if config.is_none() { + continue; + } + for Task { action, days, .. } in config.unwrap().tasks.into_iter() { match action { Action::Delete => { let stream = stream.to_string(); diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index d3ecd4040..e6393b27b 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -22,7 +22,9 @@ use std::{ path::PathBuf, }; +use bytes::Bytes; use once_cell::sync::OnceCell; +use relative_path::RelativePathBuf; use std::io; use crate::{ @@ -33,11 +35,11 @@ use crate::{ utils::uid, }; -use super::{object_storage::parseable_json_path, PARSEABLE_METADATA_FILE_NAME}; +use super::PARSEABLE_METADATA_FILE_NAME; // Expose some static variables for internal usage pub static STORAGE_METADATA: OnceCell = OnceCell::new(); - +pub const CURRENT_STORAGE_METADATA_VERSION: &str = "v4"; // For use in global static #[derive(Debug, PartialEq, Eq)] pub struct StaticStorageMetadata { @@ -66,7 +68,7 @@ pub struct StorageMetadata { impl StorageMetadata { pub fn new() -> Self { Self { - version: "v3".to_string(), + version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), staging: CONFIG.staging_dir().to_path_buf(), storage: CONFIG.storage().get_endpoint(), @@ -98,14 +100,19 @@ impl StorageMetadata { /// deals with the staging directory creation and metadata resolution /// always returns remote metadata as it is source of truth /// overwrites staging metadata while updating storage info -pub async fn resolve_parseable_metadata() -> Result { +pub async fn resolve_parseable_metadata( + parseable_metadata: &Option, +) -> Result { let staging_metadata = get_staging_metadata()?; - let storage = CONFIG.storage().get_object_store(); - let remote_metadata = storage.get_metadata().await?; - + let mut remote_metadata: Option = None; + if parseable_metadata.is_some() { + remote_metadata = Some( + serde_json::from_slice(parseable_metadata.as_ref().unwrap()) + .expect("parseable config is valid json"), + ); + } // Env Change needs to be updated let check = determine_environment(staging_metadata, remote_metadata); - // flags for if metadata needs to be synced let mut overwrite_staging = false; let mut overwrite_remote = false; @@ -115,7 +122,7 @@ pub async fn resolve_parseable_metadata() -> Result Result<(), Metadata } pub fn get_staging_metadata() -> io::Result> { - let path = parseable_json_path().to_path(CONFIG.staging_dir()); + let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(CONFIG.staging_dir()); let bytes = match fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { diff --git a/server/src/validator.rs b/server/src/validator.rs index db0a7ecab..a1648ff22 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -24,17 +24,7 @@ use crate::handlers::http::cluster::INTERNAL_STREAM_NAME; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ - "select", - "from", - "where", - "group", - "by", - "order", - "limit", - "offset", - "join", - "and", - INTERNAL_STREAM_NAME, + "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", ]; pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { @@ -125,6 +115,12 @@ pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { )); } + if stream_name == INTERNAL_STREAM_NAME { + return Err(StreamNameValidationError::InternalStream( + stream_name.to_owned(), + )); + } + Ok(()) } @@ -182,6 +178,8 @@ pub mod error { NameUpperCase(String), #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), + #[error("`pmeta` is an internal stream name and cannot be used.")] + InternalStream(String), } #[derive(Debug, thiserror::Error)]