From 275e8d9086eabe2bebe907616dd9d4e77771de4a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 10 Aug 2024 13:53:19 +0530 Subject: [PATCH 1/3] enhancement: add stream type to stream definition 1. add stream_type=UserDefined for user defined streams 2. add stream_type=Internal for pmeta 3. migrate existing streams' stream.json from v4 to v5 4. add field stream_type in migration for existing streams 5. add field stream_type in GET /logstream/{logstream}/info API --- server/src/event.rs | 7 +++- server/src/event/writer.rs | 15 +++++--- server/src/handlers/http/ingest.rs | 24 ++++++++----- server/src/handlers/http/logstream.rs | 11 +++--- server/src/metadata.rs | 14 +++++++- server/src/migration.rs | 13 +++++++ .../migration/stream_metadata_migration.rs | 34 ++++++++++++++++++- server/src/storage.rs | 23 +++++++++++-- server/src/storage/object_storage.rs | 7 ++-- server/src/validator.rs | 10 ++++-- 10 files changed, 131 insertions(+), 27 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 9e8f09a6e..eda5dd889 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -use crate::{handlers::http::ingest::PostError, metadata}; +use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType}; use chrono::NaiveDateTime; use std::collections::HashMap; @@ -45,6 +45,7 @@ pub struct Event { pub parsed_timestamp: NaiveDateTime, pub time_partition: Option, pub custom_partition_values: HashMap, + pub stream_type: StreamType, } // Events holds the schema related to a each event for a single log stream @@ -75,6 +76,7 @@ impl Event { self.rb.clone(), self.parsed_timestamp, &self.custom_partition_values, + &self.stream_type, )?; metadata::STREAM_INFO.update_stats( @@ -106,6 +108,7 @@ impl Event { self.rb.clone(), self.parsed_timestamp, &self.custom_partition_values, + &self.stream_type, ) .map_err(PostError::Event) } @@ -122,6 +125,7 @@ impl Event { rb: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, + stream_type: &StreamType, ) -> Result<(), EventError> { STREAM_WRITERS.append_to_local( stream_name, @@ -129,6 +133,7 @@ impl Event { rb, parsed_timestamp, custom_partition_values.clone(), + stream_type, )?; Ok(()) } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 0090a47f2..af31fc54f 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -26,8 +26,8 @@ use std::{ }; use crate::{ - handlers::http::cluster::INTERNAL_STREAM_NAME, option::{Mode, CONFIG}, + storage::StreamType, utils, }; @@ -92,6 +92,7 @@ impl WriterTable { record: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: HashMap, + stream_type: &StreamType, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); @@ -104,6 +105,7 @@ impl WriterTable { record, parsed_timestamp, &custom_partition_values, + stream_type, )?; } None => { @@ -118,12 +120,14 @@ impl WriterTable { record, parsed_timestamp, &custom_partition_values, + stream_type, )?; } }; Ok(()) } + #[allow(clippy::too_many_arguments)] fn handle_existing_writer( &self, stream_writer: &Mutex, @@ -132,8 +136,9 @@ impl WriterTable { record: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, + stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { + if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { stream_writer.lock().unwrap().push( stream_name, schema_key, @@ -151,6 +156,7 @@ impl WriterTable { Ok(()) } + #[allow(clippy::too_many_arguments)] fn handle_missing_writer( &self, mut map: RwLockWriteGuard>>, @@ -159,10 +165,11 @@ impl WriterTable { record: RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, + stream_type: &StreamType, ) -> Result<(), StreamWriterError> { match map.get(stream_name) { Some(writer) => { - if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { + if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { writer.lock().unwrap().push( stream_name, schema_key, @@ -175,7 +182,7 @@ impl WriterTable { } } None => { - if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { + if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { let mut writer = Writer::default(); writer.push( stream_name, diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 93b40f35d..b9c10359d 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -34,7 +34,7 @@ use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; -use crate::storage::{LogStream, ObjectStorageError}; +use crate::storage::{LogStream, ObjectStorageError, StreamType}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; @@ -57,13 +57,15 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { - create_stream_if_not_exists(&stream_name, true).await?; + create_stream_if_not_exists(&stream_name, StreamType::Internal).await?; let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); let (rb, is_first) = { @@ -100,6 +102,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< parsed_timestamp, time_partition: None, custom_partition_values: HashMap::new(), + stream_type: StreamType::Internal, } .process() .await?; @@ -116,7 +119,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if stream_name.eq(INTERNAL_STREAM_NAME) { + let internal_stream_names = STREAM_INFO.list_internal_streams(); + if internal_stream_names.contains(&stream_name) || stream_name == INTERNAL_STREAM_NAME { return Err(PostError::Invalid(anyhow::anyhow!( "Stream {} is an internal stream and cannot be ingested into", stream_name @@ -202,6 +206,7 @@ pub async fn push_logs_unchecked( time_partition: None, is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push + stream_type: StreamType::UserDefined, }; unchecked_event.process_unchecked()?; @@ -369,6 +374,7 @@ async fn create_process_record_batch( parsed_timestamp, time_partition: time_partition.clone(), custom_partition_values: custom_partition_values.clone(), + stream_type: StreamType::UserDefined, } .process() .await?; @@ -413,7 +419,7 @@ fn into_event_batch( // Check if the stream exists and create a new stream if doesn't exist pub async fn create_stream_if_not_exists( stream_name: &str, - internal_stream: bool, + stream_type: StreamType, ) -> Result<(), PostError> { if STREAM_INFO.stream_exists(stream_name) { return Ok(()); @@ -427,7 +433,7 @@ pub async fn create_stream_if_not_exists( "", "", Arc::new(Schema::empty()), - internal_stream, + stream_type, ) .await?; } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index c8282d560..b2f186180 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -34,6 +34,7 @@ use crate::option::validation::bytes_to_human_size; use crate::option::{Mode, CONFIG}; use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; +use crate::storage::StreamType; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{ catalog::{self, remove_manifest_from_snapshot}, @@ -377,7 +378,7 @@ async fn create_update_stream( &custom_partition, &static_schema_flag, schema, - false, + StreamType::UserDefined, ) .await?; @@ -835,11 +836,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: &str, schema: Arc, - internal_stream: bool, + stream_type: StreamType, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if !internal_stream { - validator::stream_name(&stream_name)?; + if stream_type != StreamType::Internal { + validator::stream_name(&stream_name, &stream_type)?; } // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); @@ -852,6 +853,7 @@ pub async fn create_stream( custom_partition, static_schema_flag, schema.clone(), + stream_type, ) .await { @@ -908,6 +910,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, pub static_schema_flag: Option, pub hot_tier_enabled: Option, + pub stream_type: StreamType, } // It is very unlikely that panic will occur when dealing with metadata. @@ -341,6 +342,7 @@ impl StreamInfo { custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, + stream_type: meta.stream_type, }; let mut map = self.write().expect(LOCK_EXPECT); @@ -357,6 +359,15 @@ impl StreamInfo { .collect() } + pub fn list_internal_streams(&self) -> Vec { + self.read() + .expect(LOCK_EXPECT) + .iter() + .filter(|(_, v)| v.stream_type != StreamType::UserDefined) + .map(|(k, _)| k.clone()) + .collect() + } + pub fn update_stats( &self, stream_name: &str, @@ -471,6 +482,7 @@ pub async fn load_stream_metadata_on_server_start( custom_partition, static_schema_flag: meta.static_schema_flag.clone(), hot_tier_enabled: meta.hot_tier_enabled, + stream_type: meta.stream_type, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); diff --git a/server/src/migration.rs b/server/src/migration.rs index bd1bf655b..122f9e9e3 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -144,6 +144,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: match version { Some("v1") => { stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -155,6 +157,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } Some("v2") => { stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -167,6 +171,15 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } Some("v3") => { stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + } + Some("v4") => { + stream_metadata_value = + stream_metadata_migration::v4_v5(stream_metadata_value, stream); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 609e951bd..82c676f82 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -19,7 +19,10 @@ use serde_json::{json, Value}; -use crate::{catalog::snapshot::CURRENT_SNAPSHOT_VERSION, storage}; +use crate::{ + catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME, + storage, +}; pub fn v1_v4(mut stream_metadata: Value) -> Value { let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); @@ -144,6 +147,35 @@ pub fn v3_v4(mut stream_metadata: Value) -> Value { stream_metadata } +pub fn v4_v5(mut stream_metadata: Value, stream_name: &str) -> Value { + let stream_metadata_map: &mut serde_json::Map = + stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + let stream_type = stream_metadata_map.get("stream_type"); + if stream_type.is_none() { + if stream_name.eq(INTERNAL_STREAM_NAME) { + stream_metadata_map.insert( + "stream_type".to_owned(), + Value::String(storage::StreamType::Internal.to_string()), + ); + } else { + stream_metadata_map.insert( + "stream_type".to_owned(), + Value::String(storage::StreamType::UserDefined.to_string()), + ); + } + } + + stream_metadata +} + fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { let manifest_list = snapshot.get("manifest_list").unwrap(); let mut new_manifest_list = Vec::new(); diff --git a/server/src/storage.rs b/server/src/storage.rs index fe081b617..1926c9b04 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -68,8 +68,8 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; -pub const CURRENT_OBJECT_STORE_VERSION: &str = "v4"; -pub const CURRENT_SCHEMA_VERSION: &str = "v4"; +pub const CURRENT_OBJECT_STORE_VERSION: &str = "v5"; +pub const CURRENT_SCHEMA_VERSION: &str = "v5"; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct ObjectStoreFormat { @@ -102,6 +102,7 @@ pub struct ObjectStoreFormat { pub static_schema_flag: Option, #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, + pub stream_type: StreamType, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -121,6 +122,23 @@ pub struct StreamInfo { pub custom_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, + pub stream_type: StreamType, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] +pub enum StreamType { + #[default] + UserDefined, + Internal, +} + +impl std::fmt::Display for StreamType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StreamType::UserDefined => write!(f, "UserDefined"), + StreamType::Internal => write!(f, "Internal"), + } + } } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -157,6 +175,7 @@ impl Default for ObjectStoreFormat { Self { version: CURRENT_SCHEMA_VERSION.to_string(), objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), + stream_type: StreamType::UserDefined, created_at: Local::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 67119a164..49d42fd17 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -21,8 +21,8 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + StreamType, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; @@ -131,6 +131,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn create_stream( &self, stream_name: &str, @@ -139,12 +140,14 @@ pub trait ObjectStorage: Sync + 'static { custom_partition: &str, static_schema_flag: &str, schema: Arc, + stream_type: StreamType, ) -> Result { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; format.created_at = Local::now().to_rfc3339(); + format.stream_type = stream_type; if time_partition.is_empty() { format.time_partition = None; } else { diff --git a/server/src/validator.rs b/server/src/validator.rs index 000ccb3cd..0a591f2af 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -25,6 +25,7 @@ use crate::alerts::{Alerts, Rule}; use crate::handlers::http::cluster::INTERNAL_STREAM_NAME; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; use crate::option::validation::{bytes_to_human_size, human_size_to_bytes}; +use crate::storage::StreamType; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -75,7 +76,10 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { Ok(()) } -pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { +pub fn stream_name( + stream_name: &str, + stream_type: &StreamType, +) -> Result<(), StreamNameValidationError> { if stream_name.is_empty() { return Err(StreamNameValidationError::EmptyName); } @@ -119,7 +123,7 @@ pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { )); } - if stream_name == INTERNAL_STREAM_NAME { + if *stream_type == StreamType::Internal || stream_name == INTERNAL_STREAM_NAME { return Err(StreamNameValidationError::InternalStream( stream_name.to_owned(), )); @@ -195,7 +199,7 @@ pub mod error { NameUpperCase(String), #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), - #[error("`pmeta` is an internal stream name and cannot be used.")] + #[error("The stream {0} is reserved for internal use and cannot be used for user defined streams")] InternalStream(String), } From 01a4e55b5fa5c501983209e040d52e4840588e40 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 12 Aug 2024 14:29:31 +0530 Subject: [PATCH 2/3] review comments incorporated query server creates internal stream and sync with ingestors on server start no ingestion can happen from ingestor for internal stream --- server/src/handlers.rs | 1 + server/src/handlers/http/cluster/mod.rs | 55 +++++++++++++++++-- server/src/handlers/http/ingest.rs | 18 +++--- server/src/handlers/http/logstream.rs | 51 +++++++++++++---- .../src/handlers/http/modal/query_server.rs | 4 ++ server/src/metadata.rs | 17 +++++- server/src/storage.rs | 6 +- server/src/storage/object_storage.rs | 8 +-- server/src/validator.rs | 8 +-- 9 files changed, 127 insertions(+), 41 deletions(-) diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 78e7b2da9..60f64fd63 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -34,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; +const STREAM_TYPE_KEY: &str = "x-p-stream-type"; const OIDC_SCOPE: &str = "openid profile email"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index b3bd78334..b9c7d28dd 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -23,13 +23,14 @@ use crate::handlers::http::cluster::utils::{ }; use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; +use crate::handlers::STREAM_TYPE_KEY; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; -use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; +use crate::storage::{StreamType, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; @@ -145,6 +146,53 @@ pub async fn sync_streams_with_ingestors( Ok(()) } +/// sync internal streams with all ingestors +pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<(), StreamError> { + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::Anyhow(err) + })?; + + let client = reqwest::Client::new(); + for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let url = format!( + "{}{}/logstream/{}", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + let res = client + .put(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .header(STREAM_TYPE_KEY, StreamType::Internal.to_string()) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + StreamError::Network(err) + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await + ); + } + } + + Ok(()) +} + pub async fn fetch_daily_stats_from_ingestors( stream_name: &str, date: &str, @@ -572,7 +620,6 @@ async fn fetch_cluster_metrics() -> Result, PostError> { pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { log::info!("Setting up schedular for cluster metrics ingestion"); - let mut scheduler = AsyncScheduler::new(); scheduler .every(CLUSTER_METRICS_INTERVAL_SECONDS) @@ -583,11 +630,9 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { if !metrics.is_empty() { log::info!("Cluster metrics fetched successfully from all ingestors"); if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - let stream_name = INTERNAL_STREAM_NAME; - if matches!( ingest_internal_stream( - stream_name.to_string(), + INTERNAL_STREAM_NAME.to_string(), bytes::Bytes::from(metrics_bytes), ) .await, diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index b9c10359d..698bda10a 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,8 +16,7 @@ * */ -use super::cluster::INTERNAL_STREAM_NAME; -use super::logstream::error::CreateStreamError; +use super::logstream::error::{CreateStreamError, StreamError}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; use super::{kinesis, otel}; @@ -58,14 +57,13 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { - create_stream_if_not_exists(&stream_name, StreamType::Internal).await?; let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); let (rb, is_first) = { @@ -119,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let internal_stream_names = STREAM_INFO.list_internal_streams(); - if internal_stream_names.contains(&stream_name) || stream_name == INTERNAL_STREAM_NAME { + if internal_stream_names.contains(&stream_name) { return Err(PostError::Invalid(anyhow::anyhow!( "Stream {} is an internal stream and cannot be ingested into", stream_name @@ -419,7 +416,7 @@ fn into_event_batch( // Check if the stream exists and create a new stream if doesn't exist pub async fn create_stream_if_not_exists( stream_name: &str, - stream_type: StreamType, + stream_type: &str, ) -> Result<(), PostError> { if STREAM_INFO.stream_exists(stream_name) { return Ok(()); @@ -494,6 +491,8 @@ pub enum PostError { DashboardError(#[from] DashboardError), #[error("Error: {0}")] CacheError(#[from] CacheError), + #[error("Error: {0}")] + StreamError(#[from] StreamError), } impl actix_web::ResponseError for PostError { @@ -515,6 +514,7 @@ impl actix_web::ResponseError for PostError { PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index b2f186180..8fae8b6a1 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -20,12 +20,14 @@ use self::error::{CreateStreamError, StreamError}; use super::base_path_without_preceding_slash; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use super::cluster::{ - fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors, + fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, + sync_internal_streams_with_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME, }; +use super::ingest::create_stream_if_not_exists; use crate::alerts::Alerts; use crate::handlers::{ - CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, - UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, + TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }; use crate::hottier::{HotTierManager, StreamHotTier}; use crate::metadata::STREAM_INFO; @@ -180,6 +182,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result (String, String, String, String, String) { +) -> (String, String, String, String, String, String) { let mut time_partition = String::default(); let mut time_partition_limit = String::default(); let mut custom_partition = String::default(); let mut static_schema_flag = String::default(); let mut update_stream = String::default(); + let mut stream_type = StreamType::UserDefined.to_string(); req.headers().iter().for_each(|(key, value)| { if key == TIME_PARTITION_KEY { time_partition = value.to_str().unwrap().to_string(); @@ -210,6 +216,9 @@ fn fetch_headers_from_put_stream_request( if key == UPDATE_STREAM_KEY { update_stream = value.to_str().unwrap().to_string(); } + if key == STREAM_TYPE_KEY { + stream_type = value.to_str().unwrap().to_string(); + } }); ( @@ -218,6 +227,7 @@ fn fetch_headers_from_put_stream_request( custom_partition, static_schema_flag, update_stream, + stream_type, ) } @@ -305,8 +315,14 @@ async fn create_update_stream( body: &Bytes, stream_name: &str, ) -> Result<(), StreamError> { - let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) = - fetch_headers_from_put_stream_request(req); + let ( + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + update_stream, + stream_type, + ) = fetch_headers_from_put_stream_request(req); if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { return Err(StreamError::Custom { @@ -378,7 +394,7 @@ async fn create_update_stream( &custom_partition, &static_schema_flag, schema, - StreamType::UserDefined, + &stream_type, ) .await?; @@ -647,7 +663,9 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats = if CONFIG.parseable.mode == Mode::Query { + let ingestor_stats = if CONFIG.parseable.mode == Mode::Query + && STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::UserDefined.to_string() + { Some(fetch_stats_from_ingestors(&stream_name).await?) } else { None @@ -836,11 +854,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: &str, schema: Arc, - stream_type: StreamType, + stream_type: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if stream_type != StreamType::Internal { - validator::stream_name(&stream_name, &stream_type)?; + if stream_type != StreamType::Internal.to_string() { + validator::stream_name(&stream_name, stream_type)?; } // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); @@ -876,6 +894,7 @@ pub async fn create_stream( custom_partition.to_string(), static_schema_flag.to_string(), static_schema, + stream_type, ); } Err(err) => { @@ -1041,6 +1060,16 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result Result<(), StreamError> { + if create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()) + .await + .is_ok() + { + sync_internal_streams_with_ingestors(INTERNAL_STREAM_NAME).await?; + } + Ok(()) +} #[allow(unused)] fn classify_json_error(kind: serde_json::error::Category) -> StatusCode { match kind { diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 73a44faf1..c731ea5ca 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -18,6 +18,7 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::hottier::HotTierManager; @@ -174,6 +175,9 @@ impl QueryServer { migration::run_migration(&CONFIG).await?; + //create internal stream at server start + create_internal_stream_if_not_exists().await?; + FILTERS.load().await?; DASHBOARDS.load().await?; // track all parquet files already in the data directory diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 1534f4980..49a7843a4 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -62,7 +62,7 @@ pub struct LogStreamMetadata { pub custom_partition: Option, pub static_schema_flag: Option, pub hot_tier_enabled: Option, - pub stream_type: StreamType, + pub stream_type: String, } // It is very unlikely that panic will occur when dealing with metadata. @@ -269,6 +269,7 @@ impl StreamInfo { custom_partition: String, static_schema_flag: String, static_schema: HashMap>, + stream_type: &str, ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { @@ -302,6 +303,7 @@ impl StreamInfo { } else { static_schema }, + stream_type: stream_type.to_string(), ..Default::default() }; map.insert(stream_name, metadata); @@ -363,11 +365,18 @@ impl StreamInfo { self.read() .expect(LOCK_EXPECT) .iter() - .filter(|(_, v)| v.stream_type != StreamType::UserDefined) + .filter(|(_, v)| v.stream_type == StreamType::Internal.to_string()) .map(|(k, _)| k.clone()) .collect() } + pub fn stream_type(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.stream_type.clone()) + } + pub fn update_stats( &self, stream_name: &str, @@ -426,6 +435,7 @@ pub async fn load_stream_metadata_on_server_start( let mut custom_partition = meta.custom_partition.clone(); let mut cache_enabled = meta.cache_enabled; let mut static_schema_flag = meta.static_schema_flag.clone(); + let mut stream_type = meta.stream_type.clone(); if CONFIG.parseable.mode == Mode::Ingest { storage.put_schema(stream_name, &schema).await?; // get the base stream metadata @@ -443,7 +453,7 @@ pub async fn load_stream_metadata_on_server_start( custom_partition.clone_from(&querier_meta.custom_partition); cache_enabled.clone_from(&querier_meta.cache_enabled); static_schema_flag.clone_from(&querier_meta.static_schema_flag); - + stream_type.clone_from(&querier_meta.stream_type); meta = ObjectStoreFormat { retention: retention.clone(), cache_enabled, @@ -451,6 +461,7 @@ pub async fn load_stream_metadata_on_server_start( time_partition_limit: time_partition_limit.clone(), custom_partition: custom_partition.clone(), static_schema_flag: static_schema_flag.clone(), + stream_type: stream_type.clone(), ..meta.clone() }; storage.put_stream_manifest(stream_name, &meta).await?; diff --git a/server/src/storage.rs b/server/src/storage.rs index 1926c9b04..539f84ce7 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -102,7 +102,7 @@ pub struct ObjectStoreFormat { pub static_schema_flag: Option, #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, - pub stream_type: StreamType, + pub stream_type: String, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -122,7 +122,7 @@ pub struct StreamInfo { pub custom_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, - pub stream_type: StreamType, + pub stream_type: String, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] @@ -175,7 +175,7 @@ impl Default for ObjectStoreFormat { Self { version: CURRENT_SCHEMA_VERSION.to_string(), objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), - stream_type: StreamType::UserDefined, + stream_type: StreamType::UserDefined.to_string(), created_at: Local::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 49d42fd17..33a7d6353 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -21,8 +21,8 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - StreamType, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, - PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; @@ -140,14 +140,14 @@ pub trait ObjectStorage: Sync + 'static { custom_partition: &str, static_schema_flag: &str, schema: Arc, - stream_type: StreamType, + stream_type: &str, ) -> Result { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; format.created_at = Local::now().to_rfc3339(); - format.stream_type = stream_type; + format.stream_type = stream_type.to_string(); if time_partition.is_empty() { format.time_partition = None; } else { diff --git a/server/src/validator.rs b/server/src/validator.rs index 0a591f2af..391b274e9 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -22,7 +22,6 @@ use self::error::{AlertValidationError, StreamNameValidationError, UsernameValid use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; -use crate::handlers::http::cluster::INTERNAL_STREAM_NAME; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; use crate::option::validation::{bytes_to_human_size, human_size_to_bytes}; use crate::storage::StreamType; @@ -76,10 +75,7 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { Ok(()) } -pub fn stream_name( - stream_name: &str, - stream_type: &StreamType, -) -> Result<(), StreamNameValidationError> { +pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNameValidationError> { if stream_name.is_empty() { return Err(StreamNameValidationError::EmptyName); } @@ -123,7 +119,7 @@ pub fn stream_name( )); } - if *stream_type == StreamType::Internal || stream_name == INTERNAL_STREAM_NAME { + if stream_type == StreamType::Internal.to_string() { return Err(StreamNameValidationError::InternalStream( stream_name.to_owned(), )); From 273d0228921d48733f7a004a8b345adfa600404e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 13 Aug 2024 13:30:47 +0530 Subject: [PATCH 3/3] 1. removed method created for syncing internal stream 2. modified sync_streams_with_ingestors that can be used for all streams --- server/src/handlers/http/cluster/mod.rs | 63 ++++--------------------- server/src/handlers/http/logstream.rs | 34 ++++++++----- 2 files changed, 31 insertions(+), 66 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index b9c7d28dd..d91284a08 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -23,19 +23,18 @@ use crate::handlers::http::cluster::utils::{ }; use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; -use crate::handlers::STREAM_TYPE_KEY; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; +use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; -use crate::storage::{StreamType, PARSEABLE_ROOT_DIRECTORY}; -use actix_web::http::header; +use actix_web::http::header::{self, HeaderMap}; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; -use http::StatusCode; +use http::{header as http_header, StatusCode}; use itertools::Itertools; use relative_path::RelativePathBuf; use serde::de::Error; @@ -97,57 +96,15 @@ pub async fn sync_cache_with_ingestors( // forward the request to all ingestors to keep them in sync pub async fn sync_streams_with_ingestors( - req: HttpRequest, + headers: HeaderMap, body: Bytes, stream_name: &str, ) -> Result<(), StreamError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::Anyhow(err) - })?; + let mut reqwest_headers = http_header::HeaderMap::new(); - let client = reqwest::Client::new(); - for ingestor in ingestor_infos.iter() { - if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } - let url = format!( - "{}{}/logstream/{}", - ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - let res = client - .put(url) - .headers(req.headers().into()) - .header(header::AUTHORIZATION, &ingestor.token) - .body(body.clone()) - .send() - .await - .map_err(|err| { - log::error!( - "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err - ); - StreamError::Network(err) - })?; - - if !res.status().is_success() { - log::error!( - "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res - ); - } + for (key, value) in headers.iter() { + reqwest_headers.insert(key.clone(), value.clone()); } - - Ok(()) -} - -/// sync internal streams with all ingestors -pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<(), StreamError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) @@ -167,9 +124,9 @@ pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<( ); let res = client .put(url) + .headers(reqwest_headers.clone()) .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .header(STREAM_TYPE_KEY, StreamType::Internal.to_string()) + .body(body.clone()) .send() .await .map_err(|err| { @@ -185,7 +142,7 @@ pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<( log::error!( "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, - res.text().await + res ); } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 8fae8b6a1..080761b7d 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -20,8 +20,8 @@ use self::error::{CreateStreamError, StreamError}; use super::base_path_without_preceding_slash; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use super::cluster::{ - fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, - sync_internal_streams_with_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME, + fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors, + INTERNAL_STREAM_NAME, }; use super::ingest::create_stream_if_not_exists; use crate::alerts::Alerts; @@ -44,16 +44,19 @@ use crate::{ }; use crate::{metadata, validator}; +use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; +use http::{HeaderName, HeaderValue}; use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; use std::num::NonZeroU32; +use std::str::FromStr; use std::sync::Arc; pub async fn delete(req: HttpRequest) -> Result { @@ -179,12 +182,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result<(), StreamError> { +) -> Result { let ( time_partition, time_partition_limit, @@ -355,17 +355,16 @@ async fn create_update_stream( let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) .await?; - return Ok(()); + return Ok(req.headers().clone()); } if !custom_partition.is_empty() { validate_custom_partition(&custom_partition)?; update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; - return Ok(()); } else { update_custom_partition_in_stream(stream_name.to_string(), "").await?; - return Ok(()); } + return Ok(req.headers().clone()); } let mut time_partition_in_days = ""; if !time_partition_limit.is_empty() { @@ -398,7 +397,7 @@ async fn create_update_stream( ) .await?; - Ok(()) + Ok(req.headers().clone()) } pub async fn put_alert( req: HttpRequest, @@ -1066,7 +1065,16 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { .await .is_ok() { - sync_internal_streams_with_ingestors(INTERNAL_STREAM_NAME).await?; + let mut header_map = HeaderMap::new(); + header_map.insert( + HeaderName::from_str(STREAM_TYPE_KEY).unwrap(), + HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(), + ); + header_map.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; } Ok(()) }