From 51eee313e2327216a9d145dfe5aa7a5dfa7cc0f1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Jun 2024 00:21:22 +0530 Subject: [PATCH 1/6] fix for stream sync for ingestors at create and update stream - sync from querier to all live ingestors sync stream and schema at server start --- server/src/handlers/http/cluster/mod.rs | 103 +++------- server/src/handlers/http/logstream.rs | 183 ++++++++++-------- .../src/handlers/http/modal/ingest_server.rs | 16 +- server/src/metadata.rs | 104 +++++++--- server/src/migration.rs | 108 +++++------ server/src/storage/object_storage.rs | 2 +- 6 files changed, 266 insertions(+), 250 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 0ced902d4..32ccd5821 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -23,7 +23,6 @@ use crate::handlers::http::cluster::utils::{ }; use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; -use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; @@ -96,19 +95,17 @@ pub async fn sync_cache_with_ingestors( } // forward the request to all ingestors to keep them in sync -#[allow(dead_code)] pub async fn sync_streams_with_ingestors( + req: HttpRequest, + body: Bytes, stream_name: &str, - time_partition: &str, - static_schema: &str, - schema: Bytes, ) -> Result<(), StreamError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; - let mut errored = false; + let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { let url = format!( "{}{}/logstream/{}", @@ -116,42 +113,29 @@ pub async fn sync_streams_with_ingestors( base_path_without_preceding_slash(), stream_name ); + let res = client + .put(url) + .headers(req.headers().into()) + .header(header::AUTHORIZATION, &ingestor.token) + .body(body.clone()) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + StreamError::Network(err) + })?; - match send_stream_sync_request( - &url, - ingestor.clone(), - time_partition, - static_schema, - schema.clone(), - ) - .await - { - Ok(_) => continue, - Err(_) => { - errored = true; - break; - } - } - } - - if errored { - for ingestor in ingestor_infos { - let url = format!( - "{}{}/logstream/{}", + if !res.status().is_success() { + log::error!( + "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name + res ); - - // delete the stream - send_stream_delete_request(&url, ingestor.clone()).await?; } - - // this might be a bit too much - return Err(StreamError::Custom { - msg: "Failed to sync stream with ingestors".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); } Ok(()) @@ -301,49 +285,6 @@ pub async fn fetch_stats_from_ingestors( Ok(vec![qs]) } -#[allow(dead_code)] -async fn send_stream_sync_request( - url: &str, - ingestor: IngestorMetadata, - time_partition: &str, - static_schema: &str, - schema: Bytes, -) -> Result<(), StreamError> { - if !utils::check_liveness(&ingestor.domain_name).await { - return Ok(()); - } - - let client = reqwest::Client::new(); - let res = client - .put(url) - .header(header::CONTENT_TYPE, "application/json") - .header(TIME_PARTITION_KEY, time_partition) - .header(STATIC_SCHEMA_FLAG, static_schema) - .header(header::AUTHORIZATION, ingestor.token) - .body(schema) - .send() - .await - .map_err(|err| { - log::error!( - "Fatal: failed to forward create stream request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err - ); - StreamError::Network(err) - })?; - - if !res.status().is_success() { - log::error!( - "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res - ); - return Err(StreamError::Network(res.error_for_status().unwrap_err())); - } - - Ok(()) -} - /// send a delete stream request to all ingestors pub async fn send_stream_delete_request( url: &str, diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 5045dcf8b..ac51a2724 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -19,7 +19,9 @@ use self::error::{CreateStreamError, StreamError}; use super::base_path_without_preceding_slash; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; -use super::cluster::{fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors}; +use super::cluster::{ + fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors, +}; use crate::alerts::Alerts; use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, @@ -169,89 +171,14 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) = - fetch_headers_from_put_stream_request(&req); - - if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" { - // Error if the log stream already exists - return Err(StreamError::Custom { - msg: format!( - "Logstream {stream_name} already exists, please create a new log stream with unique name" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - if !time_partition.is_empty() && update_stream == "true" { - return Err(StreamError::Custom { - msg: "Altering the time partition of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let mut time_partition_in_days: &str = ""; - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(&time_partition_limit); - if let Err(err) = time_partition_days { - return Err(StreamError::CreateStream(err)); - } else { - time_partition_in_days = time_partition_days.unwrap(); - if update_stream == "true" { - if let Err(err) = update_time_partition_limit_in_stream( - stream_name.clone(), - time_partition_in_days, - ) - .await - { - return Err(StreamError::CreateStream(err)); - } - return Ok(("Log stream updated", StatusCode::OK)); - } - } - } - - if !static_schema_flag.is_empty() && update_stream == "true" { - return Err(StreamError::Custom { - msg: "Altering the schema of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - if !custom_partition.is_empty() { - if let Err(err) = validate_custom_partition(&custom_partition) { - return Err(StreamError::CreateStream(err)); - } - if update_stream == "true" { - if let Err(err) = - update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await - { - return Err(StreamError::CreateStream(err)); - } - return Ok(("Log stream updated", StatusCode::OK)); - } - } - - let schema = validate_static_schema( - &body, - &stream_name, - &time_partition, - &custom_partition, - &static_schema_flag, - ); - if let Err(err) = schema { - return Err(StreamError::CreateStream(err)); + if CONFIG.parseable.mode == Mode::Query { + create_update_stream(&req, &body, &stream_name).await?; + sync_streams_with_ingestors(req, body, &stream_name).await?; + } else { + create_update_stream(&req, &body, &stream_name).await?; } - create_stream( - stream_name, - &time_partition, - time_partition_in_days, - &custom_partition, - &static_schema_flag, - schema.unwrap(), - false, - ) - .await?; - Ok(("Log stream created", StatusCode::OK)) } @@ -355,6 +282,96 @@ fn validate_static_schema( Ok(schema) } +async fn create_update_stream( + req: &HttpRequest, + body: &Bytes, + stream_name: &str, +) -> Result<(), StreamError> { + let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) = + fetch_headers_from_put_stream_request(req); + + if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { + // Error if the log stream already exists + return Err(StreamError::Custom { + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); + } + + if !time_partition.is_empty() && update_stream == "true" { + return Err(StreamError::Custom { + msg: "Altering the time partition of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let mut time_partition_in_days: &str = ""; + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(&time_partition_limit); + if let Err(err) = time_partition_days { + return Err(StreamError::CreateStream(err)); + } else { + time_partition_in_days = time_partition_days.unwrap(); + if update_stream == "true" { + if let Err(err) = update_time_partition_limit_in_stream( + stream_name.to_string(), + time_partition_in_days, + ) + .await + { + return Err(StreamError::CreateStream(err)); + } + return Ok(()); + } + } + } + + if !static_schema_flag.is_empty() && update_stream == "true" { + return Err(StreamError::Custom { + msg: "Altering the schema of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if !custom_partition.is_empty() { + if let Err(err) = validate_custom_partition(&custom_partition) { + return Err(StreamError::CreateStream(err)); + } + if update_stream == "true" { + if let Err(err) = + update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await + { + return Err(StreamError::CreateStream(err)); + } + return Ok(()); + } + } + + let schema = validate_static_schema( + body, + stream_name, + &time_partition, + &custom_partition, + &static_schema_flag, + ); + if let Err(err) = schema { + return Err(StreamError::CreateStream(err)); + } + + create_stream( + stream_name.to_string(), + &time_partition, + time_partition_in_days, + &custom_partition, + &static_schema_flag, + schema.unwrap(), + false, + ) + .await?; + + Ok(()) +} pub async fn put_alert( req: HttpRequest, body: web::Json, @@ -471,7 +488,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result {} } - let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?; + let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; Ok((web::Json(cache_enabled), StatusCode::OK)) } @@ -545,7 +562,7 @@ pub async fn put_enable_cache( .put_stream_manifest(&stream_name, &stream_metadata) .await?; - STREAM_INFO.set_stream_cache(&stream_name, enable_cache)?; + STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?; Ok(( format!("Cache set to {enable_cache} for log stream {stream_name}"), StatusCode::OK, diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index b07cc50af..1ec5a452c 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -165,11 +165,17 @@ impl IngestServer { web::scope("/logstream").service( web::scope("/{logstream}") .service( - web::resource("").route( - web::delete() - .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), - ), + web::resource("") + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ), ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream diff --git a/server/src/metadata.rs b/server/src/metadata.rs index d29527c53..43a35295e 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -21,17 +21,24 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; +use serde_json::Value; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; use crate::metrics::{ - EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, - EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, + fetch_stats_from_storage, EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, + EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, + LIFETIME_EVENTS_INGESTED_SIZE, }; +use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; -use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir}; +use crate::storage::{ + LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, STREAM_METADATA_FILE_NAME, + STREAM_ROOT_DIRECTORY, +}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -92,13 +99,6 @@ impl StreamInfo { Ok(!self.schema(stream_name)?.fields.is_empty()) } - pub fn cache_enabled(&self, stream_name: &str) -> Result { - let map = self.read().expect(LOCK_EXPECT); - map.get(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.cache_enabled) - } - pub fn get_first_event(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -147,15 +147,6 @@ impl StreamInfo { .map(|metadata| metadata.retention.clone()) } - pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); - let stream = map - .get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; - stream.cache_enabled = enable; - Ok(()) - } - pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); let schema = map @@ -237,6 +228,22 @@ impl StreamInfo { }) } + pub fn get_cache_enabled(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.cache_enabled) + } + + pub fn set_cache_enabled(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + let stream = map + .get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; + stream.cache_enabled = enable; + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub fn add_stream( &self, @@ -379,8 +386,9 @@ pub async fn load_stream_metadata_on_server_start( storage: &(impl ObjectStorage + ?Sized), stream_name: &str, schema: Schema, - meta: &ObjectStoreFormat, + stream_metadata_value: Value, ) -> Result<(), LoadError> { + storage.put_schema(stream_name, &schema).await?; let alerts = storage.get_alerts(stream_name).await?; let schema = update_schema_from_staging(stream_name, schema); let schema = HashMap::from_iter( @@ -390,23 +398,69 @@ pub async fn load_stream_metadata_on_server_start( .map(|v| (v.name().to_owned(), v.clone())), ); + let mut meta: ObjectStoreFormat = ObjectStoreFormat::default(); + if !stream_metadata_value.is_null() { + meta = + serde_json::from_slice(&serde_json::to_vec(&stream_metadata_value).unwrap()).unwrap(); + } + + let mut retention = meta.retention.clone(); + let mut time_partition = meta.time_partition.clone(); + let mut time_partition_limit = meta.time_partition_limit.clone(); + let mut custom_partition = meta.custom_partition.clone(); + let mut cache_enabled = meta.cache_enabled; + let mut static_schema_flag = meta.static_schema_flag.clone(); + if CONFIG.parseable.mode == Mode::Ingest { + // get the base stream metadata + let bytes = storage + .get_object(&RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, + ])) + .await?; + let querier_meta: ObjectStoreFormat = serde_json::from_slice(&bytes).unwrap(); + retention.clone_from(&querier_meta.retention); + time_partition.clone_from(&querier_meta.time_partition); + time_partition_limit.clone_from(&querier_meta.time_partition_limit); + custom_partition.clone_from(&querier_meta.custom_partition); + cache_enabled.clone_from(&querier_meta.cache_enabled); + static_schema_flag.clone_from(&querier_meta.static_schema_flag); + + meta = ObjectStoreFormat { + retention: retention.clone(), + cache_enabled, + time_partition: time_partition.clone(), + time_partition_limit: time_partition_limit.clone(), + custom_partition: custom_partition.clone(), + static_schema_flag: static_schema_flag.clone(), + ..meta.clone() + }; + storage.put_stream_manifest(stream_name, &meta).await?; + } + + //load stats from storage + let stats = meta.stats; + fetch_stats_from_storage(stream_name, stats).await; + load_daily_metrics(&meta, stream_name); + let metadata = LogStreamMetadata { schema, alerts, - retention: meta.retention.clone(), - cache_enabled: meta.cache_enabled, + retention, + cache_enabled, created_at: meta.created_at.clone(), first_event_at: meta.first_event_at.clone(), time_partition: meta.time_partition.clone(), - time_partition_limit: meta.time_partition_limit.clone(), - custom_partition: meta.custom_partition.clone(), + time_partition_limit, + custom_partition, static_schema_flag: meta.static_schema_flag.clone(), }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); map.insert(stream_name.to_string(), metadata); - load_daily_metrics(meta, stream_name); + Ok(()) } diff --git a/server/src/migration.rs b/server/src/migration.rs index 8d459df36..392157901 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -25,12 +25,11 @@ use std::{fs::OpenOptions, sync::Arc}; use crate::{ metadata::load_stream_metadata_on_server_start, - metrics::fetch_stats_from_storage, - option::Config, + option::{Config, Mode, CONFIG}, storage::{ object_storage::{parseable_json_path, stream_json_path}, - ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, - PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, + ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; use arrow_schema::Schema; @@ -119,76 +118,75 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { } async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { - let path = stream_json_path(stream); + let mut arrow_schema: Schema = Schema::empty(); + let schema_path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); + let schema = storage.get_object(&schema_path).await?; + let path = stream_json_path(stream); let stream_metadata = storage.get_object(&path).await.unwrap_or_default(); + let mut stream_meta_found = true; if stream_metadata.is_empty() { - return Ok(()); + if CONFIG.parseable.mode != Mode::Ingest { + return Ok(()); + } + stream_meta_found = false; } - let mut stream_metadata: serde_json::Value = - serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - - let schema_path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = storage.get_object(&schema_path).await?; + let mut stream_metadata_value = Value::Null; + if stream_meta_found { + stream_metadata_value = + serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - let mut arrow_schema: Schema = Schema::empty(); + let version = stream_metadata_value + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); - let version = stream_metadata - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()); - - match version { - Some("v1") => { - stream_metadata = stream_metadata_migration::v1_v4(stream_metadata); - storage - .put_object(&path, to_bytes(&stream_metadata)) - .await?; - let schema = serde_json::from_slice(&schema).ok(); - arrow_schema = schema_migration::v1_v4(schema)?; - storage - .put_object(&schema_path, to_bytes(&arrow_schema)) - .await?; - } - Some("v2") => { - stream_metadata = stream_metadata_migration::v2_v4(stream_metadata); - storage - .put_object(&path, to_bytes(&stream_metadata)) - .await?; - - let schema = serde_json::from_slice(&schema)?; - arrow_schema = schema_migration::v2_v4(schema)?; - storage - .put_object(&schema_path, to_bytes(&arrow_schema)) - .await?; - } - Some("v3") => { - stream_metadata = stream_metadata_migration::v3_v4(stream_metadata); - storage - .put_object(&path, to_bytes(&stream_metadata)) - .await?; + match version { + Some("v1") => { + stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + let schema = serde_json::from_slice(&schema).ok(); + arrow_schema = schema_migration::v1_v4(schema)?; + storage + .put_object(&schema_path, to_bytes(&arrow_schema)) + .await?; + } + Some("v2") => { + stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + + let schema = serde_json::from_slice(&schema)?; + arrow_schema = schema_migration::v2_v4(schema)?; + storage + .put_object(&schema_path, to_bytes(&arrow_schema)) + .await?; + } + Some("v3") => { + stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + } + _ => (), } - _ => (), } if arrow_schema.fields().is_empty() { arrow_schema = serde_json::from_slice(&schema)?; } - //load stream metadata from storage - let meta: ObjectStoreFormat = - serde_json::from_slice(&serde_json::to_vec(&stream_metadata).unwrap()).unwrap(); if let Err(err) = - load_stream_metadata_on_server_start(storage, stream, arrow_schema, &meta).await + load_stream_metadata_on_server_start(storage, stream, arrow_schema, stream_metadata_value) + .await { log::error!("could not populate local metadata. {:?}", err); return Err(err.into()); } - //load stats from storage - let stats = meta.stats; - fetch_stats_from_storage(stream, stats).await; - Ok(()) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index a5b099f21..7c0dcda7a 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -424,7 +424,7 @@ pub trait ObjectStorage: Sync + 'static { for stream in &streams { let cache_enabled = STREAM_INFO - .cache_enabled(stream) + .get_cache_enabled(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let time_partition = STREAM_INFO .get_time_partition(stream) From 9d28a67f4bba3d3902b0aed56b5c45b49ceb142a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Jun 2024 08:44:29 +0530 Subject: [PATCH 2/6] update schema in storage only for ingestors --- server/src/metadata.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 43a35295e..fb4429a94 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -388,16 +388,6 @@ pub async fn load_stream_metadata_on_server_start( schema: Schema, stream_metadata_value: Value, ) -> Result<(), LoadError> { - storage.put_schema(stream_name, &schema).await?; - let alerts = storage.get_alerts(stream_name).await?; - let schema = update_schema_from_staging(stream_name, schema); - let schema = HashMap::from_iter( - schema - .fields - .iter() - .map(|v| (v.name().to_owned(), v.clone())), - ); - let mut meta: ObjectStoreFormat = ObjectStoreFormat::default(); if !stream_metadata_value.is_null() { meta = @@ -411,6 +401,7 @@ pub async fn load_stream_metadata_on_server_start( let mut cache_enabled = meta.cache_enabled; let mut static_schema_flag = meta.static_schema_flag.clone(); if CONFIG.parseable.mode == Mode::Ingest { + storage.put_schema(stream_name, &schema).await?; // get the base stream metadata let bytes = storage .get_object(&RelativePathBuf::from_iter([ @@ -444,6 +435,15 @@ pub async fn load_stream_metadata_on_server_start( fetch_stats_from_storage(stream_name, stats).await; load_daily_metrics(&meta, stream_name); + let alerts = storage.get_alerts(stream_name).await?; + let schema = update_schema_from_staging(stream_name, schema); + let schema = HashMap::from_iter( + schema + .fields + .iter() + .map(|v| (v.name().to_owned(), v.clone())), + ); + let metadata = LogStreamMetadata { schema, alerts, From b7d674b7e9ee1cf44d1b0ebac48568fe957541b6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Jun 2024 12:06:32 +0530 Subject: [PATCH 3/6] staging metadata and ingestor metadata credentials validation change --- server/src/handlers/http/modal/ingest_server.rs | 3 ++- server/src/migration.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 1ec5a452c..dd5742ff7 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -31,6 +31,7 @@ use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; use crate::storage::staging; use crate::storage::ObjectStorageError; +use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; use super::server::Server; @@ -285,7 +286,7 @@ impl IngestServer { async fn validate_credentials(&self) -> anyhow::Result<()> { // check if your creds match with others let store = CONFIG.storage().get_object_store(); - let base_path = RelativePathBuf::from(""); + let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); let ingestor_metadata = store .get_objects( Some(&base_path), diff --git a/server/src/migration.rs b/server/src/migration.rs index 392157901..bd1bf655b 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -221,7 +221,7 @@ pub async fn put_remote_metadata( } pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> { - let path = parseable_json_path().to_path(config.staging_dir()); + let path = config.staging_dir().join(".parseable.json"); let mut file = OpenOptions::new() .create(true) .truncate(true) From db4bf9f29f2dfe1a09c6e4f01f7cea614b0b7221 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Jun 2024 12:41:27 +0530 Subject: [PATCH 4/6] validate credentials for ingestors --- .../src/handlers/http/modal/ingest_server.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index dd5742ff7..457d51b03 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -52,9 +52,9 @@ use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; use bytes::Bytes; -use itertools::Itertools; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; +use serde_json::Value; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = @@ -292,12 +292,16 @@ impl IngestServer { Some(&base_path), Box::new(|file_name| file_name.starts_with("ingestor")), ) - .await? - .iter() - .map(|x| serde_json::from_slice::(x).unwrap_or_default()) - .collect_vec(); - if !ingestor_metadata.is_empty() { - let check = ingestor_metadata[0].token.clone(); + .await?; + + if !ingestor_metadata.len() > 0 { + let ingestor_metadata_value: Value = + serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json"); + let check = ingestor_metadata_value + .as_object() + .and_then(|meta| meta.get("token")) + .and_then(|token| token.as_str()) + .unwrap(); let token = base64::prelude::BASE64_STANDARD.encode(format!( "{}:{}", From 25fece4e82817252fcb45a52d9a06e9bc5c525fc Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Jun 2024 13:05:54 +0530 Subject: [PATCH 5/6] validate credentials for ingestors --- server/src/handlers/http/modal/ingest_server.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 457d51b03..06d9e4753 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -293,8 +293,7 @@ impl IngestServer { Box::new(|file_name| file_name.starts_with("ingestor")), ) .await?; - - if !ingestor_metadata.len() > 0 { + if ingestor_metadata.len() > 0 { let ingestor_metadata_value: Value = serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json"); let check = ingestor_metadata_value From 40c86013133a0244b7caa7c19a4fa462d344c75b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Jun 2024 13:11:27 +0530 Subject: [PATCH 6/6] cargo clippy suggestions change --- server/src/handlers/http/modal/ingest_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 06d9e4753..b5f31a4a5 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -293,7 +293,7 @@ impl IngestServer { Box::new(|file_name| file_name.starts_with("ingestor")), ) .await?; - if ingestor_metadata.len() > 0 { + if !ingestor_metadata.is_empty() { let ingestor_metadata_value: Value = serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json"); let check = ingestor_metadata_value