From c091026bdcec95ca2a0fd6019d63b7847dc6bbeb Mon Sep 17 00:00:00 2001 From: Anirudh Chauhan Date: Sat, 2 Nov 2024 13:02:30 +0530 Subject: [PATCH 1/5] feat: allow stream creation from ingestor in distributed deployments Co-authored-by: Akshat Agarwal --- server/src/handlers/http/cluster/mod.rs | 59 +++++++++++++++++++ server/src/handlers/http/ingest.rs | 16 +++-- .../http/modal/query/querier_logstream.rs | 4 ++ server/src/migration.rs | 4 ++ server/src/migration/metadata_migration.rs | 50 ++++++++++++++++ server/src/storage.rs | 3 +- server/src/storage/store_metadata.rs | 4 ++ 7 files changed, 134 insertions(+), 6 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 4e7d3219f..7b9799789 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; use crate::stats::Stats; +use crate::storage::get_staging_metadata; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; @@ -841,3 +842,61 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { Ok(()) } + +pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> { + let client = reqwest::Client::new(); + + let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { + StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) + })?; + let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); + let token = staging_metadata.querier_auth_token.unwrap(); + + if !check_liveness(&querier_endpoint).await { + log::warn!("Querier {} is not live", querier_endpoint); + return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live"))); + } + + let url = format!( + "{}{}/logstream/{}", + querier_endpoint, + base_path_without_preceding_slash(), + stream_name + ); + + let response = client + .put(&url) + .header(header::AUTHORIZATION, &token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward create stream request to querier: {}\n Error: {:?}", + &url, + err + ); + StreamError::Network(err) + })?; + + let status = response.status(); + + if !status.is_success() { + let response_text = response.text().await.map_err(|err| { + log::error!("Failed to read response text from querier: {}", &url); + StreamError::Network(err) + })?; + + log::error!( + "Failed to forward create stream request to querier: {}\nResponse Returned: {:?}", + &url, + response_text + ); + + return Err(StreamError::Anyhow(anyhow::anyhow!( + "Request failed with status: {}", + status, + ))); + } + + Ok(()) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7e36a4d8a..48ec1d9dc 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -26,6 +26,7 @@ use crate::event::{ error::EventError, format::{self, EventFormat}, }; +use crate::handlers::http::cluster::forward_create_stream_request; use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; @@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists( if !streams.contains(&LogStream { name: stream_name.to_owned(), }) { - log::error!("Stream {} not found", stream_name); - return Err(PostError::Invalid(anyhow::anyhow!( - "Stream `{}` not found. Please create it using the Query server.", - stream_name - ))); + match forward_create_stream_request(stream_name).await { + Ok(()) => log::info!("Stream {} created", stream_name), + Err(e) => { + return Err(PostError::Invalid(anyhow::anyhow!( + "Unable to create stream: {} using query server. Error: {}", + stream_name, + e.to_string(), + ))) + } + }; } metadata::STREAM_INFO .upsert_stream_info( diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index 4fb060850..6fe54c4f1 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -4,6 +4,9 @@ use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use tokio::sync::Mutex; + +static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); use crate::{ event, @@ -77,6 +80,7 @@ pub async fn delete(req: HttpRequest) -> Result { pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let _ = CREATE_STREAM_LOCK.lock().await; let headers = create_update_stream(&req, &body, &stream_name).await?; sync_streams_with_ingestors(headers, body, &stream_name).await?; diff --git a/server/src/migration.rs b/server/src/migration.rs index c0c483b2b..2b1d8a5e5 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -78,6 +78,10 @@ pub async fn run_metadata_migration( let metadata = metadata_migration::v3_v4(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } + Some("v4") => { + let metadata = metadata_migration::v4_v5(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } _ => (), } } diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 10bfa940c..3c32ff241 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -16,6 +16,7 @@ * */ +use base64::Engine; use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; @@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } +// maybe rename +pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + metadata.remove_entry("version"); + metadata.insert("version".to_string(), JsonValue::String("v5".to_string())); + + match metadata.get("server_mode") { + None => { + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), + ); + } + Some(JsonValue::String(mode)) => match mode.as_str() { + "Query" => { + metadata.insert( + "querier_endpoint".to_string(), + JsonValue::String(CONFIG.parseable.address.clone()), + ); + } + "All" => { + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), + ); + metadata.insert( + "querier_endpoint".to_string(), + JsonValue::String(CONFIG.parseable.address.clone()), + ); + } + _ => (), + }, + _ => (), + } + + metadata.insert( + "querier_auth_token".to_string(), + JsonValue::String(format!( + "Basic {}", + base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )) + )), + ); + + storage_metadata +} + pub async fn migrate_ingester_metadata() -> anyhow::Result> { let imp = ingestor_metadata_path(None); let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await { diff --git a/server/src/storage.rs b/server/src/storage.rs index a018c2b1c..4dc108534 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -40,7 +40,8 @@ pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; pub use store_metadata::{ - put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, + get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, + StorageMetadata, }; // metadata file names in a Stream prefix diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index acdbeb8e4..071f714de 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -63,6 +63,8 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, + pub querier_endpoint: Option, + pub querier_auth_token: Option, } impl StorageMetadata { @@ -78,6 +80,8 @@ impl StorageMetadata { streams: Vec::new(), roles: HashMap::default(), default_role: None, + querier_endpoint: None, + querier_auth_token: None, } } From 279fd980763e8d5154743ce15ac561e1c096327f Mon Sep 17 00:00:00 2001 From: Anirudh Chauhan Date: Tue, 5 Nov 2024 17:32:31 +0530 Subject: [PATCH 2/5] fix: add querier details to parseable.json on a fresh init --- server/src/handlers/http/cluster/mod.rs | 4 ++-- server/src/storage/store_metadata.rs | 23 ++++++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 7b9799789..a20bc146b 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -849,8 +849,8 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) })?; - let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); - let token = staging_metadata.querier_auth_token.unwrap(); + let querier_endpoint = to_url_string(staging_metadata.querier_endpoint); + let token = staging_metadata.querier_auth_token; if !check_liveness(&querier_endpoint).await { log::warn!("Querier {} is not live", querier_endpoint); diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 071f714de..34aa79075 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -22,6 +22,7 @@ use std::{ path::PathBuf, }; +use base64::Engine; use bytes::Bytes; use once_cell::sync::OnceCell; use relative_path::RelativePathBuf; @@ -63,12 +64,25 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, - pub querier_endpoint: Option, - pub querier_auth_token: Option, + pub querier_endpoint: String, + pub querier_auth_token: String, } impl StorageMetadata { pub fn new() -> Self { + let querier_auth_token = format!( + "Basic {}", + base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )) + ); + + let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode { + Mode::All | Mode::Query => (CONFIG.parseable.address.clone(), querier_auth_token), + Mode::Ingest => (String::new(), String::new()), + }; + Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), @@ -80,11 +94,10 @@ impl StorageMetadata { streams: Vec::new(), roles: HashMap::default(), default_role: None, - querier_endpoint: None, - querier_auth_token: None, + querier_endpoint, + querier_auth_token, } } - pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() From 4357c93eab0d7339da9a6a87f185ee7d95167b8f Mon Sep 17 00:00:00 2001 From: Anirudh Chauhan Date: Tue, 5 Nov 2024 18:50:36 +0530 Subject: [PATCH 3/5] style(deepsource): use String::default() instead of String::new() --- server/src/storage/store_metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 34aa79075..1fe84a3d0 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -80,7 +80,7 @@ impl StorageMetadata { let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode { Mode::All | Mode::Query => (CONFIG.parseable.address.clone(), querier_auth_token), - Mode::Ingest => (String::new(), String::new()), + Mode::Ingest => (String::default(), String::default()), }; Self { From 4190f962ff702ed1afffddda29f97d8e08f9da56 Mon Sep 17 00:00:00 2001 From: Anirudh Chauhan Date: Thu, 7 Nov 2024 11:35:26 +0530 Subject: [PATCH 4/5] fix: modify querier_{endpoint, auth_token} to Option --- server/src/handlers/http/cluster/mod.rs | 4 ++-- server/src/storage/store_metadata.rs | 28 ++++++++++++++----------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index a20bc146b..7b9799789 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -849,8 +849,8 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) })?; - let querier_endpoint = to_url_string(staging_metadata.querier_endpoint); - let token = staging_metadata.querier_auth_token; + let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); + let token = staging_metadata.querier_auth_token.unwrap(); if !check_liveness(&querier_endpoint).await { log::warn!("Querier {} is not live", querier_endpoint); diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 1fe84a3d0..f19765f02 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -64,23 +64,27 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, - pub querier_endpoint: String, - pub querier_auth_token: String, + pub querier_endpoint: Option, + pub querier_auth_token: Option, } impl StorageMetadata { pub fn new() -> Self { - let querier_auth_token = format!( - "Basic {}", - base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password - )) - ); - let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode { - Mode::All | Mode::Query => (CONFIG.parseable.address.clone(), querier_auth_token), - Mode::Ingest => (String::default(), String::default()), + Mode::All | Mode::Query => { + let querier_auth_token = format!( + "Basic {}", + base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )) + ); + ( + Some(CONFIG.parseable.address.clone()), + Some(querier_auth_token), + ) + } + Mode::Ingest => (None, None), }; Self { From 66d89f4dbd2b0a94910f00d2387992c56e704fbb Mon Sep 17 00:00:00 2001 From: Anirudh Chauhan Date: Mon, 11 Nov 2024 12:19:55 +0530 Subject: [PATCH 5/5] fix(ingestor): skip self when forwarding put stream request to querier --- server/src/handlers/http/cluster/mod.rs | 17 ++++++++++++++--- server/src/handlers/http/logstream.rs | 2 +- .../http/modal/query/querier_logstream.rs | 16 ++++++++++++++-- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 7b9799789..e438ab8f4 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -65,6 +65,7 @@ pub async fn sync_streams_with_ingestors( headers: HeaderMap, body: Bytes, stream_name: &str, + skip_ingestor: Option, ) -> Result<(), StreamError> { let mut reqwest_headers = http_header::HeaderMap::new(); @@ -77,7 +78,16 @@ pub async fn sync_streams_with_ingestors( })?; let client = reqwest::Client::new(); - for ingestor in ingestor_infos.iter() { + + let final_ingestor_infos = match skip_ingestor { + None => ingestor_infos, + Some(skip_ingestor) => ingestor_infos + .into_iter() + .filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone())) + .collect::>(), + }; + + for ingestor in final_ingestor_infos { if !utils::check_liveness(&ingestor.domain_name).await { log::warn!("Ingestor {} is not live", ingestor.domain_name); continue; @@ -858,10 +868,11 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre } let url = format!( - "{}{}/logstream/{}", + "{}{}/logstream/{}?skip_ingestors={}", querier_endpoint, base_path_without_preceding_slash(), - stream_name + stream_name, + CONFIG.parseable.ingestor_endpoint, ); let response = client diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 2dd34d4dc..00ddff3f3 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { header::CONTENT_TYPE, HeaderValue::from_static("application/json"), ); - sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; + sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?; } Ok(()) } diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index 6fe54c4f1..c67d173ae 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -1,9 +1,11 @@ +use core::str; use std::fs; use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use serde::Deserialize; use tokio::sync::Mutex; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); @@ -77,12 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { +#[derive(Deserialize)] +pub struct PutStreamQuery { + skip_ingestors: Option, +} + +pub async fn put_stream( + req: HttpRequest, + body: Bytes, + info: web::Query, +) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let _ = CREATE_STREAM_LOCK.lock().await; let headers = create_update_stream(&req, &body, &stream_name).await?; - sync_streams_with_ingestors(headers, body, &stream_name).await?; + + sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?; Ok(("Log stream created", StatusCode::OK)) }