diff --git a/server/src/event.rs b/server/src/event.rs index 9e8f09a6e..eda5dd889 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -use crate::{handlers::http::ingest::PostError, metadata}; +use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType}; use chrono::NaiveDateTime; use std::collections::HashMap; @@ -45,6 +45,7 @@ pub struct Event { pub parsed_timestamp: NaiveDateTime, pub time_partition: Option, pub custom_partition_values: HashMap, + pub stream_type: StreamType, } // Events holds the schema related to a each event for a single log stream @@ -75,6 +76,7 @@ impl Event { self.rb.clone(), self.parsed_timestamp, &self.custom_partition_values, + &self.stream_type, )?; metadata::STREAM_INFO.update_stats( @@ -106,6 +108,7 @@ impl Event { self.rb.clone(), self.parsed_timestamp, &self.custom_partition_values, + &self.stream_type, ) .map_err(PostError::Event) } @@ -122,6 +125,7 @@ impl Event { rb: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, + stream_type: &StreamType, ) -> Result<(), EventError> { STREAM_WRITERS.append_to_local( stream_name, @@ -129,6 +133,7 @@ impl Event { rb, parsed_timestamp, custom_partition_values.clone(), + stream_type, )?; Ok(()) } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 0090a47f2..af31fc54f 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -26,8 +26,8 @@ use std::{ }; use crate::{ - handlers::http::cluster::INTERNAL_STREAM_NAME, option::{Mode, CONFIG}, + storage::StreamType, utils, }; @@ -92,6 +92,7 @@ impl WriterTable { record: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: HashMap, + stream_type: &StreamType, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); @@ -104,6 +105,7 @@ impl WriterTable { record, parsed_timestamp, &custom_partition_values, + stream_type, )?; } None => { @@ -118,12 +120,14 @@ impl WriterTable { record, parsed_timestamp, &custom_partition_values, + stream_type, )?; } }; Ok(()) } + #[allow(clippy::too_many_arguments)] fn handle_existing_writer( &self, stream_writer: &Mutex, @@ -132,8 +136,9 @@ impl WriterTable { record: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, + stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { + if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { stream_writer.lock().unwrap().push( stream_name, schema_key, @@ -151,6 +156,7 @@ impl WriterTable { Ok(()) } + #[allow(clippy::too_many_arguments)] fn handle_missing_writer( &self, mut map: RwLockWriteGuard>>, @@ -159,10 +165,11 @@ impl WriterTable { record: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, + stream_type: &StreamType, ) -> Result<(), StreamWriterError> { match map.get(stream_name) { Some(writer) => { - if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { + if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { writer.lock().unwrap().push( stream_name, schema_key, @@ -175,7 +182,7 @@ impl WriterTable { } } None => { - if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { + if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { let mut writer = Writer::default(); writer.push( stream_name, diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 78e7b2da9..60f64fd63 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -34,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; +const STREAM_TYPE_KEY: &str = "x-p-stream-type"; const OIDC_SCOPE: &str = "openid profile email"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index b3bd78334..d91284a08 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -30,11 +30,11 @@ use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; -use actix_web::http::header; +use actix_web::http::header::{self, HeaderMap}; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; -use http::StatusCode; +use http::{header as http_header, StatusCode}; use itertools::Itertools; use relative_path::RelativePathBuf; use serde::de::Error; @@ -96,10 +96,15 @@ pub async fn sync_cache_with_ingestors( // forward the request to all ingestors to keep them in sync pub async fn sync_streams_with_ingestors( - req: HttpRequest, + headers: HeaderMap, body: Bytes, stream_name: &str, ) -> Result<(), StreamError> { + let mut reqwest_headers = http_header::HeaderMap::new(); + + for (key, value) in headers.iter() { + reqwest_headers.insert(key.clone(), value.clone()); + } let ingestor_infos = get_ingestor_info().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) @@ -119,7 +124,7 @@ pub async fn sync_streams_with_ingestors( ); let res = client .put(url) - .headers(req.headers().into()) + .headers(reqwest_headers.clone()) .header(header::AUTHORIZATION, &ingestor.token) .body(body.clone()) .send() @@ -572,7 +577,6 @@ async fn fetch_cluster_metrics() -> Result, PostError> { pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { log::info!("Setting up schedular for cluster metrics ingestion"); - let mut scheduler = AsyncScheduler::new(); scheduler .every(CLUSTER_METRICS_INTERVAL_SECONDS) @@ -583,11 +587,9 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { if !metrics.is_empty() { log::info!("Cluster metrics fetched successfully from all ingestors"); if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - let stream_name = INTERNAL_STREAM_NAME; - if matches!( ingest_internal_stream( - stream_name.to_string(), + INTERNAL_STREAM_NAME.to_string(), bytes::Bytes::from(metrics_bytes), ) .await, diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 93b40f35d..698bda10a 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,8 +16,7 @@ * */ -use super::cluster::INTERNAL_STREAM_NAME; -use super::logstream::error::CreateStreamError; +use super::logstream::error::{CreateStreamError, StreamError}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; use super::{kinesis, otel}; @@ -34,7 +33,7 @@ use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; -use crate::storage::{LogStream, ObjectStorageError}; +use crate::storage::{LogStream, ObjectStorageError, StreamType}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; @@ -57,13 +56,14 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { - create_stream_if_not_exists(&stream_name, true).await?; let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); let (rb, is_first) = { @@ -100,6 +99,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< parsed_timestamp, time_partition: None, custom_partition_values: HashMap::new(), + stream_type: StreamType::Internal, } .process() .await?; @@ -116,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if stream_name.eq(INTERNAL_STREAM_NAME) { + let internal_stream_names = STREAM_INFO.list_internal_streams(); + if internal_stream_names.contains(&stream_name) { return Err(PostError::Invalid(anyhow::anyhow!( "Stream {} is an internal stream and cannot be ingested into", stream_name @@ -202,6 +203,7 @@ pub async fn push_logs_unchecked( time_partition: None, is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push + stream_type: StreamType::UserDefined, }; unchecked_event.process_unchecked()?; @@ -369,6 +371,7 @@ async fn create_process_record_batch( parsed_timestamp, time_partition: time_partition.clone(), custom_partition_values: custom_partition_values.clone(), + stream_type: StreamType::UserDefined, } .process() .await?; @@ -413,7 +416,7 @@ fn into_event_batch( // Check if the stream exists and create a new stream if doesn't exist pub async fn create_stream_if_not_exists( stream_name: &str, - internal_stream: bool, + stream_type: &str, ) -> Result<(), PostError> { if STREAM_INFO.stream_exists(stream_name) { return Ok(()); @@ -427,7 +430,7 @@ pub async fn create_stream_if_not_exists( "", "", Arc::new(Schema::empty()), - internal_stream, + stream_type, ) .await?; } @@ -488,6 +491,8 @@ pub enum PostError { DashboardError(#[from] DashboardError), #[error("Error: {0}")] CacheError(#[from] CacheError), + #[error("Error: {0}")] + StreamError(#[from] StreamError), } impl actix_web::ResponseError for PostError { @@ -509,6 +514,7 @@ impl actix_web::ResponseError for PostError { PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index c8282d560..080761b7d 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -21,11 +21,13 @@ use super::base_path_without_preceding_slash; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use super::cluster::{ fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors, + INTERNAL_STREAM_NAME, }; +use super::ingest::create_stream_if_not_exists; use crate::alerts::Alerts; use crate::handlers::{ - CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, - UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, + TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }; use crate::hottier::{HotTierManager, StreamHotTier}; use crate::metadata::STREAM_INFO; @@ -34,6 +36,7 @@ use crate::option::validation::bytes_to_human_size; use crate::option::{Mode, CONFIG}; use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; +use crate::storage::StreamType; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{ catalog::{self, remove_manifest_from_snapshot}, @@ -41,16 +44,19 @@ use crate::{ }; use crate::{metadata, validator}; +use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; +use http::{HeaderName, HeaderValue}; use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; use std::num::NonZeroU32; +use std::str::FromStr; use std::sync::Arc; pub async fn delete(req: HttpRequest) -> Result { @@ -176,8 +182,8 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result (String, String, String, String, String) { +) -> (String, String, String, String, String, String) { let mut time_partition = String::default(); let mut time_partition_limit = String::default(); let mut custom_partition = String::default(); let mut static_schema_flag = String::default(); let mut update_stream = String::default(); + let mut stream_type = StreamType::UserDefined.to_string(); req.headers().iter().for_each(|(key, value)| { if key == TIME_PARTITION_KEY { time_partition = value.to_str().unwrap().to_string(); @@ -209,6 +216,9 @@ fn fetch_headers_from_put_stream_request( if key == UPDATE_STREAM_KEY { update_stream = value.to_str().unwrap().to_string(); } + if key == STREAM_TYPE_KEY { + stream_type = value.to_str().unwrap().to_string(); + } }); ( @@ -217,6 +227,7 @@ fn fetch_headers_from_put_stream_request( custom_partition, static_schema_flag, update_stream, + stream_type, ) } @@ -303,9 +314,15 @@ async fn create_update_stream( req: &HttpRequest, body: &Bytes, stream_name: &str, -) -> Result<(), StreamError> { - let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) = - fetch_headers_from_put_stream_request(req); +) -> Result { + let ( + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + update_stream, + stream_type, + ) = fetch_headers_from_put_stream_request(req); if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { return Err(StreamError::Custom { @@ -338,17 +355,16 @@ async fn create_update_stream( let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) .await?; - return Ok(()); + return Ok(req.headers().clone()); } if !custom_partition.is_empty() { validate_custom_partition(&custom_partition)?; update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; - return Ok(()); } else { update_custom_partition_in_stream(stream_name.to_string(), "").await?; - return Ok(()); } + return Ok(req.headers().clone()); } let mut time_partition_in_days = ""; if !time_partition_limit.is_empty() { @@ -377,11 +393,11 @@ async fn create_update_stream( &custom_partition, &static_schema_flag, schema, - false, + &stream_type, ) .await?; - Ok(()) + Ok(req.headers().clone()) } pub async fn put_alert( req: HttpRequest, @@ -646,7 +662,9 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats = if CONFIG.parseable.mode == Mode::Query { + let ingestor_stats = if CONFIG.parseable.mode == Mode::Query + && STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::UserDefined.to_string() + { Some(fetch_stats_from_ingestors(&stream_name).await?) } else { None @@ -835,11 +853,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: &str, schema: Arc, - internal_stream: bool, + stream_type: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if !internal_stream { - validator::stream_name(&stream_name)?; + if stream_type != StreamType::Internal.to_string() { + validator::stream_name(&stream_name, stream_type)?; } // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); @@ -852,6 +870,7 @@ pub async fn create_stream( custom_partition, static_schema_flag, schema.clone(), + stream_type, ) .await { @@ -874,6 +893,7 @@ pub async fn create_stream( custom_partition.to_string(), static_schema_flag.to_string(), static_schema, + stream_type, ); } Err(err) => { @@ -908,6 +928,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result Result Result<(), StreamError> { + if create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()) + .await + .is_ok() + { + let mut header_map = HeaderMap::new(); + header_map.insert( + HeaderName::from_str(STREAM_TYPE_KEY).unwrap(), + HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(), + ); + header_map.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; + } + Ok(()) +} #[allow(unused)] fn classify_json_error(kind: serde_json::error::Category) -> StatusCode { match kind { diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 73a44faf1..c731ea5ca 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -18,6 +18,7 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::hottier::HotTierManager; @@ -174,6 +175,9 @@ impl QueryServer { migration::run_migration(&CONFIG).await?; + //create internal stream at server start + create_internal_stream_if_not_exists().await?; + FILTERS.load().await?; DASHBOARDS.load().await?; // track all parquet files already in the data directory diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 40b47720a..49a7843a4 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -36,7 +36,7 @@ use crate::metrics::{ use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; use crate::storage::{ - LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, STREAM_METADATA_FILE_NAME, + LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::utils::arrow::MergedRecordReader; @@ -62,6 +62,7 @@ pub struct LogStreamMetadata { pub custom_partition: Option, pub static_schema_flag: Option, pub hot_tier_enabled: Option, + pub stream_type: String, } // It is very unlikely that panic will occur when dealing with metadata. @@ -268,6 +269,7 @@ impl StreamInfo { custom_partition: String, static_schema_flag: String, static_schema: HashMap>, + stream_type: &str, ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { @@ -301,6 +303,7 @@ impl StreamInfo { } else { static_schema }, + stream_type: stream_type.to_string(), ..Default::default() }; map.insert(stream_name, metadata); @@ -341,6 +344,7 @@ impl StreamInfo { custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, + stream_type: meta.stream_type, }; let mut map = self.write().expect(LOCK_EXPECT); @@ -357,6 +361,22 @@ impl StreamInfo { .collect() } + pub fn list_internal_streams(&self) -> Vec { + self.read() + .expect(LOCK_EXPECT) + .iter() + .filter(|(_, v)| v.stream_type == StreamType::Internal.to_string()) + .map(|(k, _)| k.clone()) + .collect() + } + + pub fn stream_type(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.stream_type.clone()) + } + pub fn update_stats( &self, stream_name: &str, @@ -415,6 +435,7 @@ pub async fn load_stream_metadata_on_server_start( let mut custom_partition = meta.custom_partition.clone(); let mut cache_enabled = meta.cache_enabled; let mut static_schema_flag = meta.static_schema_flag.clone(); + let mut stream_type = meta.stream_type.clone(); if CONFIG.parseable.mode == Mode::Ingest { storage.put_schema(stream_name, &schema).await?; // get the base stream metadata @@ -432,7 +453,7 @@ pub async fn load_stream_metadata_on_server_start( custom_partition.clone_from(&querier_meta.custom_partition); cache_enabled.clone_from(&querier_meta.cache_enabled); static_schema_flag.clone_from(&querier_meta.static_schema_flag); - + stream_type.clone_from(&querier_meta.stream_type); meta = ObjectStoreFormat { retention: retention.clone(), cache_enabled, @@ -440,6 +461,7 @@ pub async fn load_stream_metadata_on_server_start( time_partition_limit: time_partition_limit.clone(), custom_partition: custom_partition.clone(), static_schema_flag: static_schema_flag.clone(), + stream_type: stream_type.clone(), ..meta.clone() }; storage.put_stream_manifest(stream_name, &meta).await?; @@ -471,6 +493,7 @@ pub async fn load_stream_metadata_on_server_start( custom_partition, static_schema_flag: meta.static_schema_flag.clone(), hot_tier_enabled: meta.hot_tier_enabled, + stream_type: meta.stream_type, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); diff --git a/server/src/migration.rs b/server/src/migration.rs index bd1bf655b..122f9e9e3 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -144,6 +144,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: match version { Some("v1") => { stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -155,6 +157,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } Some("v2") => { stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -167,6 +171,15 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } Some("v3") => { stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + } + Some("v4") => { + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 609e951bd..82c676f82 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -19,7 +19,10 @@ use serde_json::{json, Value}; -use crate::{catalog::snapshot::CURRENT_SNAPSHOT_VERSION, storage}; +use crate::{ + catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME, + storage, +}; pub fn v1_v4(mut stream_metadata: Value) -> Value { let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); @@ -144,6 +147,35 @@ pub fn v3_v4(mut stream_metadata: Value) -> Value { stream_metadata } +pub fn v4_v5(mut stream_metadata: Value, stream_name: &str) -> Value { + let stream_metadata_map: &mut serde_json::Map = + stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + let stream_type = stream_metadata_map.get("stream_type"); + if stream_type.is_none() { + if stream_name.eq(INTERNAL_STREAM_NAME) { + stream_metadata_map.insert( + "stream_type".to_owned(), + Value::String(storage::StreamType::Internal.to_string()), + ); + } else { + stream_metadata_map.insert( + "stream_type".to_owned(), + Value::String(storage::StreamType::UserDefined.to_string()), + ); + } + } + + stream_metadata +} + fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { let manifest_list = snapshot.get("manifest_list").unwrap(); let mut new_manifest_list = Vec::new(); diff --git a/server/src/storage.rs b/server/src/storage.rs index fe081b617..539f84ce7 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -68,8 +68,8 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; -pub const CURRENT_OBJECT_STORE_VERSION: &str = "v4"; -pub const CURRENT_SCHEMA_VERSION: &str = "v4"; +pub const CURRENT_OBJECT_STORE_VERSION: &str = "v5"; +pub const CURRENT_SCHEMA_VERSION: &str = "v5"; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct ObjectStoreFormat { @@ -102,6 +102,7 @@ pub struct ObjectStoreFormat { pub static_schema_flag: Option, #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, + pub stream_type: String, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -121,6 +122,23 @@ pub struct StreamInfo { pub custom_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, + pub stream_type: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] +pub enum StreamType { + #[default] + UserDefined, + Internal, +} + +impl std::fmt::Display for StreamType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StreamType::UserDefined => write!(f, "UserDefined"), + StreamType::Internal => write!(f, "Internal"), + } + } } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -157,6 +175,7 @@ impl Default for ObjectStoreFormat { Self { version: CURRENT_SCHEMA_VERSION.to_string(), objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), + stream_type: StreamType::UserDefined.to_string(), created_at: Local::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 67119a164..33a7d6353 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -131,6 +131,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn create_stream( &self, stream_name: &str, @@ -139,12 +140,14 @@ pub trait ObjectStorage: Sync + 'static { custom_partition: &str, static_schema_flag: &str, schema: Arc, + stream_type: &str, ) -> Result { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; format.created_at = Local::now().to_rfc3339(); + format.stream_type = stream_type.to_string(); if time_partition.is_empty() { format.time_partition = None; } else { diff --git a/server/src/validator.rs b/server/src/validator.rs index 000ccb3cd..391b274e9 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -22,9 +22,9 @@ use self::error::{AlertValidationError, StreamNameValidationError, UsernameValid use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; -use crate::handlers::http::cluster::INTERNAL_STREAM_NAME; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; use crate::option::validation::{bytes_to_human_size, human_size_to_bytes}; +use crate::storage::StreamType; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -75,7 +75,7 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { Ok(()) } -pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { +pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNameValidationError> { if stream_name.is_empty() { return Err(StreamNameValidationError::EmptyName); } @@ -119,7 +119,7 @@ pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { )); } - if stream_name == INTERNAL_STREAM_NAME { + if stream_type == StreamType::Internal.to_string() { return Err(StreamNameValidationError::InternalStream( stream_name.to_owned(), )); @@ -195,7 +195,7 @@ pub mod error { NameUpperCase(String), #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), - #[error("`pmeta` is an internal stream name and cannot be used.")] + #[error("The stream {0} is reserved for internal use and cannot be used for user defined streams")] InternalStream(String), }