From bfdc0f7b13611d32531f5d58cd5e07281795e603 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 14:07:17 +0530 Subject: [PATCH 1/8] Impl Ingest Server add DELETE endpoint for stream on IngestServer add QuriedStats struct add stats endpoint for logstream on Ingest Server add guard to create stream automatically during ingest on Ingest Server fix: api path /logstream on ingest server make get_ingestor_info func pub add logstream api endpoint to put fix: func dir_with_stream to use the proper stream metadata filename fix: update stream metadata file name make stream_json_path give the correct file path when in different server modes --- server/src/handlers/http.rs | 3 - server/src/handlers/http/ingest.rs | 14 +- server/src/handlers/http/logstream.rs | 74 +++- .../src/handlers/http/modal/ingest_server.rs | 62 ++- .../src/handlers/http/modal/query_server.rs | 412 +++++++++++++++++- server/src/migration.rs | 7 +- server/src/storage.rs | 2 +- server/src/storage/localfs.rs | 28 +- server/src/storage/object_storage.rs | 14 +- server/src/storage/s3.rs | 18 +- 10 files changed, 566 insertions(+), 68 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index ea69b0b59..dc301c14d 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -32,9 +32,6 @@ pub(crate) mod query; pub(crate) mod rbac; pub(crate) mod role; -// this needs to be removed from here. It is in modal->mod.rs -// include!(concat!(env!("OUT_DIR"), "/generated.rs")); - pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "/api"; pub const API_VERSION: &str = "v1"; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2a3281843..a5ed7978f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -32,6 +32,7 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; +use crate::option::{Mode, CONFIG}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use super::logstream::error::CreateStreamError; @@ -140,7 +141,18 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string()).await?; + + match &CONFIG.parseable.mode { + Mode::All | Mode::Query => { + super::logstream::create_stream(stream_name.to_string()).await?; + } + Mode::Ingest => { + return Err(PostError::Invalid(anyhow::anyhow!( + "Stream {} not found. Has it been created?", + stream_name + ))); + } + } Ok(()) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index a2fc93cc9..53384ee96 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -25,7 +25,7 @@ use serde_json::Value; use crate::alerts::Alerts; use crate::metadata::STREAM_INFO; -use crate::option::CONFIG; +use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; use crate::{catalog, event, stats}; @@ -33,6 +33,8 @@ use crate::{metadata, validator}; use self::error::{CreateStreamError, StreamError}; +use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats}; + pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -111,7 +113,6 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if metadata::STREAM_INFO.stream_exists(&stream_name) { // Error if the log stream already exists return Err(StreamError::Custom { @@ -121,7 +122,11 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } - create_stream(stream_name).await?; + if CONFIG.parseable.mode == Mode::Query { + query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?; + } + + create_stream(stream_name.clone()).await?; Ok(("log stream created", StatusCode::OK)) } @@ -279,30 +284,62 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + if CONFIG.parseable.mode == Mode::Query { + let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?; + let stats = serde_json::to_value(stats).unwrap(); + return Ok((web::Json(stats), StatusCode::OK)); + } + let hash_map = STREAM_INFO.read().unwrap(); let stream_meta = &hash_map .get(&stream_name) .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; let time = Utc::now(); + let qstats = match &stream_meta.first_event_at { + Some(first_event_at) => { + let ingestion_stats = IngestionStats::new( + stats.events, + format!("{} {}", stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = + StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + + QueriedStats::new( + &stream_name, + &stream_meta.created_at, + Some(first_event_at.to_owned()), + time, + ingestion_stats, + storage_stats, + ) + } - let stats = serde_json::json!({ - "stream": stream_name, - "creation_time": &stream_meta.created_at, - "first_event_at": Some(&stream_meta.first_event_at), - "time": time, - "ingestion": { - "count": stats.events, - "size": format!("{} {}", stats.ingestion, "Bytes"), - "format": "json" - }, - "storage": { - "size": format!("{} {}", stats.storage, "Bytes"), - "format": "parquet" + // ? this case should not happen + None => { + let ingestion_stats = IngestionStats::new( + stats.events, + format!("{} {}", stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = + StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + + QueriedStats::new( + &stream_name, + &stream_meta.created_at, + Some('0'.to_string()), + time, + ingestion_stats, + storage_stats, + ) } - }); + }; + + let out_stats = serde_json::to_value(qstats).unwrap(); - Ok((web::Json(stats), StatusCode::OK)) + Ok((web::Json(out_stats), StatusCode::OK)) } // Check if the first_event_at is empty @@ -345,7 +382,6 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> let created_at = stream_meta.unwrap().created_at; metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); - Ok(()) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index cfe1cbf23..2a59cca1a 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -20,6 +20,7 @@ use crate::analytics; use crate::banner; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; +use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; @@ -63,8 +64,8 @@ impl ParseableServer for IngestServer { prometheus: PrometheusMetrics, _oidc_client: Option, ) -> anyhow::Result<()> { - // set the ingestor metadata - self.set_ingestor_metadata().await?; + // set the ingester metadata + self.set_ingester_metadata().await?; // get the ssl stuff let ssl = get_ssl_acceptor( @@ -122,7 +123,9 @@ impl IngestServer { config .service( // Base path "{url}/api/v1" - web::scope(&base_path()).service(Server::get_ingest_factory()), + web::scope(&base_path()) + .service(Server::get_ingest_factory()) + .service(Self::logstream_api()), ) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) @@ -151,7 +154,7 @@ impl IngestServer { } #[inline(always)] - fn get_ingestor_address(&self) -> SocketAddr { + fn get_ingester_address(&self) -> SocketAddr { // this might cause an issue down the line // best is to make the Cli Struct better, but thats a chore (CONFIG.parseable.address.clone()) @@ -159,23 +162,60 @@ impl IngestServer { .unwrap() } - // create the ingestor metadata and put the .ingestor.json file in the object store - async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { + fn logstream_api() -> Scope { + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), + ) + .service( + web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ), + ) + } + + // create the ingester metadata and put the .ingester.json file in the object store + async fn set_ingester_metadata(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); // remove ip adn go with the domain name - let sock = self.get_ingestor_address(); + let sock = self.get_ingester_address(); let path = RelativePathBuf::from(format!( - "ingestor.{}.{}.json", + "ingester.{}.{}.json", sock.ip(), // this might be wrong sock.port() )); if store.get_object(&path).await.is_ok() { - println!("Ingestor metadata already exists"); + println!("Ingester metadata already exists"); return Ok(()); }; + let scheme = CONFIG.parseable.get_scheme(); let resource = IngesterMetadata::new( sock.port().to_string(), CONFIG @@ -183,7 +223,7 @@ impl IngestServer { .domain_address .clone() .unwrap_or_else(|| { - Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap() + Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap() }) .to_string(), DEFAULT_VERSION.to_string(), @@ -203,7 +243,7 @@ impl IngestServer { } // check for querier state. Is it there, or was it there in the past - // this should happen before the set the ingestor metadata + // this should happen before the set the ingester metadata async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> { // how do we check for querier state? // based on the work flow of the system, the querier will always need to start first diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index acd5c7579..b4bac794f 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::handlers::http::logstream::error::StreamError; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::http::header; @@ -23,8 +24,12 @@ use actix_web::web; use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use http::StatusCode; use itertools::Itertools; use relative_path::RelativePathBuf; +use reqwest::Response; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::io::AsyncWriteExt; use url::Url; @@ -49,24 +54,16 @@ impl ParseableServer for QueryServer { prometheus: actix_web_prometheus::PrometheusMetrics, oidc_client: Option, ) -> anyhow::Result<()> { - let data = Self::get_ingestor_info().await?; + let data = Self::get_ingester_info().await?; - // on subsequent runs, the qurier should check if the ingestor is up and running or not + // on subsequent runs, the qurier should check if the ingester is up and running or not for ingester in data.iter() { // dbg!(&ingester); - // yes the format macro does not need the '/' ingester.origin already - // has '/' because Url::Parse will add it if it is not present - // uri should be something like `http://address/api/v1/liveness` - let uri = Url::parse(&format!( - "{}{}/liveness", - &ingester.domain_name, - base_path() - ))?; - - if !Self::check_liveness(uri).await { - eprintln!("Ingestor at {} is not reachable", &ingester.domain_name); + + if !Self::check_liveness(&ingester.domain_name).await { + eprintln!("Ingester at {} is not reachable", &ingester.domain_name); } else { - println!("Ingestor at {} is up and running", &ingester.domain_name); + println!("Ingester at {} is up and running", &ingester.domain_name); } } @@ -148,7 +145,7 @@ impl QueryServer { } // update the .query.json file and return the new IngesterMetadataArr - async fn get_ingestor_info() -> anyhow::Result { + pub async fn get_ingester_info() -> anyhow::Result { let store = CONFIG.storage().get_object_store(); let root_path = RelativePathBuf::from(""); @@ -169,7 +166,9 @@ impl QueryServer { Ok(arr) } - pub async fn check_liveness(uri: Url) -> bool { + pub async fn check_liveness(domain_name: &str) -> bool { + let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap(); + let reqw = reqwest::Client::new() .get(uri) .header(header::CONTENT_TYPE, "application/json") @@ -216,7 +215,7 @@ impl QueryServer { } // spawn the sync thread - // tokio::spawn(Self::sync_ingestor_metadata()); + // tokio::spawn(Self::sync_ingester_metadata()); self.start(prometheus, CONFIG.parseable.openid.clone()) .await?; @@ -225,12 +224,12 @@ impl QueryServer { } #[allow(dead_code)] - async fn sync_ingestor_metadata() { + async fn sync_ingester_metadata() { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60 / 10)); loop { interval.tick().await; // dbg!("Tick"); - Self::get_ingestor_info().await.unwrap(); + Self::get_ingester_info().await.unwrap(); } } @@ -244,4 +243,379 @@ impl QueryServer { .await .unwrap() } + + // forward the request to all ingesters to keep them in sync + pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { + let ingester_infos = Self::get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + let mut errored = false; + for ingester in ingester_infos.iter() { + let url = format!( + "{}{}/logstream/{}", + ingester.domain_name.to_string().trim_end_matches('/'), + base_path(), + stream_name + ); + + match Self::send_stream_sync_request(&url, ingester.clone()).await { + Ok(_) => continue, + Err(_) => { + errored = true; + break; + } + } + } + + if errored { + for ingester in ingester_infos { + let url = format!( + "{}{}/logstream/{}", + ingester.domain_name.to_string().trim_end_matches('/'), + base_path(), + stream_name + ); + + Self::send_stream_rollback_request(&url, ingester.clone()).await?; + } + + // this might be a bit too much + return Err(StreamError::Custom { + msg: "Failed to sync stream with ingesters".to_string(), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) + } + + pub async fn fetch_stats_from_ingesters( + stream_name: &str, + ) -> Result { + let mut stats = Vec::new(); + + let ingester_infos = Self::get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + for ingester in ingester_infos { + let url = format!( + "{}{}/logstream/{}/stats", + ingester.domain_name.to_string().trim_end_matches('/'), + base_path(), + stream_name + ); + + match Self::send_stats_request(&url, ingester.clone()).await { + Ok(Some(res)) => { + match serde_json::from_str::(&res.text().await.unwrap()) { + Ok(stat) => stats.push(stat), + Err(err) => { + log::error!( + "Could not parse stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + continue; + } + } + } + Ok(None) => { + log::error!("Ingester at {} is not reachable", &ingester.domain_name); + continue; + } + Err(err) => { + log::error!( + "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + return Err(err); + } + } + } + let stats = Self::merge_quried_stats(stats); + + Ok(stats) + } + + async fn send_stats_request( + url: &str, + ingester: IngesterMetadata, + ) -> Result, StreamError> { + if !Self::check_liveness(&ingester.domain_name).await { + return Ok(None); + } + + let client = reqwest::Client::new(); + let res = client + .get(url) + .header("Content-Type", "application/json") + .header("Authorization", ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + + StreamError::Custom { + msg: format!( + "failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res + ); + return Err(StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name,res.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(Some(res)) + } + + async fn send_stream_sync_request( + url: &str, + ingester: IngesterMetadata, + ) -> Result<(), StreamError> { + if !Self::check_liveness(&ingester.domain_name).await { + return Ok(()); + } + + let client = reqwest::Client::new(); + let res = client + .put(url) + .header("Content-Type", "application/json") + .header("Authorization", ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward create stream request to ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res + ); + return Err(StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name,res.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) + } + + async fn send_stream_rollback_request( + url: &str, + ingester: IngesterMetadata, + ) -> Result<(), StreamError> { + if !Self::check_liveness(&ingester.domain_name).await { + return Ok(()); + } + + let client = reqwest::Client::new(); + let resp = client + .delete(url) + .header("Content-Type", "application/json") + .header("Authorization", ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to rollback stream creation: {}\n Error: {:?}", + ingester.domain_name, + err + ); + StreamError::Custom { + msg: format!( + "failed to rollback stream creation: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !resp.status().is_success() { + log::error!( + "failed to rollback stream creation: {}\nResponse Returned: {:?}", + ingester.domain_name, + resp + ); + return Err(StreamError::Custom { + msg: format!( + "failed to rollback stream creation: {}\nResponse Returned: {:?}", + ingester.domain_name, + resp.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) + } + + pub fn merge_quried_stats(stats: Vec) -> QueriedStats { + let min_creation_time = stats + .iter() + .map(|x| x.creation_time.parse::>().unwrap()) + .min() + .unwrap_or_default(); + let stream_name = stats[0].stream.clone(); + let min_first_event_at = stats + .iter() + .map(|x| match x.first_event_at.as_ref() { + Some(fea) => fea.parse::>().unwrap_or_default(), + None => Utc::now(), + }) + .min() + .unwrap_or(Utc::now()); + + let min_time = stats.iter().map(|x| x.time).min().unwrap_or(Utc::now()); + + let cumulative_ingestion = + stats + .iter() + .map(|x| &x.ingestion) + .fold(IngestionStats::default(), |acc, x| IngestionStats { + count: acc.count + x.count, + size: format!( + "{}", + acc.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + format: x.format.clone(), + }); + + let cumulative_storage = + stats + .iter() + .map(|x| &x.storage) + .fold(StorageStats::default(), |acc, x| StorageStats { + size: format!( + "{}", + acc.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + format: x.format.clone(), + }); + + QueriedStats::new( + &stream_name, + &min_creation_time.to_string(), + Some(min_first_event_at.to_string()), + min_time, + cumulative_ingestion, + cumulative_storage, + ) + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct QueriedStats { + pub stream: String, + pub creation_time: String, + pub first_event_at: Option, + pub time: DateTime, + pub ingestion: IngestionStats, + pub storage: StorageStats, +} + +impl QueriedStats { + pub fn new( + stream: &str, + creation_time: &str, + first_event_at: Option, + time: DateTime, + ingestion: IngestionStats, + storage: StorageStats, + ) -> Self { + Self { + stream: stream.to_string(), + creation_time: creation_time.to_string(), + first_event_at, + time, + ingestion, + storage, + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct IngestionStats { + pub count: u64, + pub size: String, + pub format: String, +} + +impl IngestionStats { + pub fn new(count: u64, size: String, format: &str) -> Self { + Self { + count, + size, + format: format.to_string(), + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StorageStats { + size: String, + format: String, +} + +impl StorageStats { + pub fn new(size: String, format: &str) -> Self { + Self { + size, + format: format.to_string(), + } + } } diff --git a/server/src/migration.rs b/server/src/migration.rs index eb3b98c0e..e5958fd29 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -30,8 +30,8 @@ use serde::Serialize; use crate::{ option::Config, storage::{ - ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, + object_storage::stream_json_path, ObjectStorage, ObjectStorageError, + PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, }, }; @@ -94,7 +94,8 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { } async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { - let path = RelativePathBuf::from_iter([stream, STREAM_METADATA_FILE_NAME]); + let path = stream_json_path(stream); + let stream_metadata = storage.get_object(&path).await?; let stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); diff --git a/server/src/storage.rs b/server/src/storage.rs index ed27ec7c8..f1a7e86b9 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -24,7 +24,7 @@ use std::fmt::Debug; mod localfs; mod metrics_layer; -mod object_storage; +pub(crate) mod object_storage; pub mod retention; mod s3; pub mod staging; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index bce837d58..1e27cb0fc 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -31,8 +31,12 @@ use relative_path::RelativePath; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; -use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::option::validation; +use crate::{ + metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}, + option::{Mode, CONFIG}, + utils::get_address, +}; use super::{ LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, STREAM_METADATA_FILE_NAME, @@ -128,15 +132,15 @@ impl ObjectStorage for LocalFS { let mut entries = fs::read_dir(&prefix).await?; let mut res = Vec::new(); while let Some(entry) = entries.next_entry().await? { - let ingestor_file = entry + let ingester_file = entry .path() .file_name() .unwrap_or_default() .to_str() .unwrap_or_default() - .contains("ingestor"); + .contains("ingester"); - if !ingestor_file { + if !ingester_file { continue; } @@ -302,7 +306,21 @@ async fn dir_with_stream( if entry.file_type().await?.is_dir() { let path = entry.path(); - let stream_json_path = path.join(STREAM_METADATA_FILE_NAME); + + let stream_json_path = match &CONFIG.parseable.mode { + Mode::Ingest => { + let (ip, port) = get_address(); + let file_name = format!( + "ingester.{}.{}{}", + &ip.to_string(), + &port.to_string(), + STREAM_METADATA_FILE_NAME + ); + path.join(file_name) + } + Mode::Query | Mode::All => path.join(STREAM_METADATA_FILE_NAME), + }; + if stream_json_path.exists() { Ok(Some(dir_name)) } else { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 53c6779e7..3fdc90204 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -25,6 +25,7 @@ use super::{ STREAM_METADATA_FILE_NAME, }; +use crate::option::Mode; use crate::utils::get_address; use crate::{ alerts::Alerts, @@ -435,8 +436,17 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { } #[inline(always)] -fn stream_json_path(stream_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]) +pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { + match &CONFIG.parseable.mode { + Mode::Ingest => { + let (ip, port) = get_address(); + let file_name = format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME); + RelativePathBuf::from_iter([stream_name, &file_name]) + } + Mode::Query | Mode::All => { + RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]) + } + } } #[inline(always)] diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index edf342f9b..c73c4598e 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -39,7 +39,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; +use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; +use crate::utils::get_address; use super::metrics_layer::MetricLayer; use super::{ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, STREAM_METADATA_FILE_NAME}; @@ -305,8 +307,16 @@ impl S3 { let stream_json_check = FuturesUnordered::new(); + let file_name = match &CONFIG.parseable.mode { + Mode::Ingest => { + let (ip, port) = get_address(); + format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME) + } + Mode::All | Mode::Query => STREAM_METADATA_FILE_NAME.to_string(), + }; + for dir in &dirs { - let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); + let key = format!("{}/{}", dir, file_name); let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } @@ -423,13 +433,13 @@ impl ObjectStorage for S3 { let mut res = vec![]; while let Some(meta) = list_stream.next().await.transpose()? { - let ingestor_file = meta + let ingester_file = meta .location .filename() .unwrap_or_default() - .contains("ingestor"); + .contains("ingester"); - if !ingestor_file { + if !ingester_file { continue; } From a8cbcc6bfcc98ae0fd00bf361c9dd3d8c6d26874 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 14:29:51 +0530 Subject: [PATCH 2/8] chore: cleanup --- server/src/handlers/http/modal/ingest_server.rs | 3 +-- server/src/handlers/http/modal/query_server.rs | 1 - server/src/main.rs | 4 ---- server/src/option.rs | 12 ------------ 4 files changed, 1 insertion(+), 19 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 2a59cca1a..5677b0259 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -104,7 +104,6 @@ impl ParseableServer for IngestServer { self.initialize().await } - #[allow(unused)] fn validate(&self) -> anyhow::Result<()> { if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::Error::msg( @@ -229,7 +228,7 @@ impl IngestServer { DEFAULT_VERSION.to_string(), store.get_bucket_name(), &CONFIG.parseable.username, - &CONFIG.parseable.password, // is this secure? + &CONFIG.parseable.password, ); let resource = serde_json::to_string(&resource) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index b4bac794f..6f27a543c 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -112,7 +112,6 @@ impl ParseableServer for QueryServer { self.initialize().await } - #[allow(unused)] fn validate(&self) -> anyhow::Result<()> { if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::anyhow!( diff --git a/server/src/main.rs b/server/src/main.rs index 60bca9fcf..f09939386 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -67,11 +67,7 @@ async fn main() -> anyhow::Result<()> { Mode::All => Arc::new(Server), }; - // add logic for graceful shutdown if // MODE == Query / Ingest and storage = local-store - // option.rs ln: 161 - // CONFIG.run_time_mode_validation()?; - server.init().await?; Ok(()) diff --git a/server/src/option.rs b/server/src/option.rs index 99bff7e66..e24e5e7e5 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -157,18 +157,6 @@ impl Config { } "S3 bucket" } - - #[allow(dead_code)] - pub fn run_time_mode_validation(&self) -> anyhow::Result<()> { - let check = (self.parseable.mode == Mode::Ingest || self.parseable.mode == Mode::Query) - && self.storage_name == "drive"; - - if check { - anyhow::bail!(format!("Cannot start the server in {} mode with local storage, please use S3 bucket for storage", self.parseable.mode.to_str())) - } - - Ok(()) - } } fn create_parseable_cli_command() -> Command { From 046842686b01cf7ac85862891537442b50d60f4b Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 14:31:04 +0530 Subject: [PATCH 3/8] server storage validation Add Guard to disallow the user to run the server in distributed setup with local storage --- server/src/handlers/http/modal/query_server.rs | 4 ++-- server/src/main.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 6f27a543c..5b34e5152 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -507,9 +507,9 @@ impl QueryServer { None => Utc::now(), }) .min() - .unwrap_or(Utc::now()); + .unwrap_or_else(Utc::now); - let min_time = stats.iter().map(|x| x.time).min().unwrap_or(Utc::now()); + let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); let cumulative_ingestion = stats diff --git a/server/src/main.rs b/server/src/main.rs index f09939386..d2991294a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -68,6 +68,7 @@ async fn main() -> anyhow::Result<()> { }; // MODE == Query / Ingest and storage = local-store + server.validate()?; server.init().await?; Ok(()) From 4e0ddd6874dedee163a5f09cd29bfdcc4bfc5adf Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 20:37:36 +0530 Subject: [PATCH 4/8] chore: remove dead code --- server/src/handlers/http/modal/ingest_server.rs | 1 - server/src/handlers/http/modal/query_server.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 5677b0259..c0d53e37f 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -100,7 +100,6 @@ impl ParseableServer for IngestServer { /// implement the init method will just invoke the initialize method async fn init(&self) -> anyhow::Result<()> { - // self.validate()?; self.initialize().await } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 5b34e5152..e20adde3d 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -108,7 +108,6 @@ impl ParseableServer for QueryServer { /// implementation of init should just invoke a call to initialize async fn init(&self) -> anyhow::Result<()> { - // self.validate()?; self.initialize().await } From 5cb685078c4ae8510c4a65483e0e411311dfaab0 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 21:41:09 +0530 Subject: [PATCH 5/8] move get_server_address func --- server/src/handlers/http/modal/ingest_server.rs | 13 +------------ server/src/handlers/http/modal/server.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index c0d53e37f..2a503b3fd 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -31,8 +31,6 @@ use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_METADATA_FILE_NAME; use crate::sync; -use std::net::SocketAddr; - use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::IngesterMetadata; @@ -151,15 +149,6 @@ impl IngestServer { ) } - #[inline(always)] - fn get_ingester_address(&self) -> SocketAddr { - // this might cause an issue down the line - // best is to make the Cli Struct better, but thats a chore - (CONFIG.parseable.address.clone()) - .parse::() - .unwrap() - } - fn logstream_api() -> Scope { web::scope("/logstream") .service( @@ -201,7 +190,7 @@ impl IngestServer { let store = CONFIG.storage().get_object_store(); // remove ip adn go with the domain name - let sock = self.get_ingester_address(); + let sock = Server::get_server_address(); let path = RelativePathBuf::from(format!( "ingester.{}.{}.json", sock.ip(), // this might be wrong diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 84b9d8e90..005a38614 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -32,6 +32,7 @@ use crate::migration; use crate::rbac; use crate::storage; use crate::sync; +use std::net::SocketAddr; use std::{fs::File, io::BufReader, sync::Arc}; use actix_web::web::resource; @@ -464,4 +465,13 @@ impl Server { }; } } + + #[inline(always)] + pub fn get_server_address() -> SocketAddr { + // this might cause an issue down the line + // best is to make the Cli Struct better, but thats a chore + (CONFIG.parseable.address.clone()) + .parse::() + .unwrap() + } } From 06a3387319ee83ed0ff73cf5ca97bc337bf29d12 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 21:42:10 +0530 Subject: [PATCH 6/8] fix: Other Ingest Servers were not starting if streams existed If Streams Existed in the data store Server was considering it as stale data --- server/src/storage/localfs.rs | 21 +++------------------ server/src/storage/s3.rs | 11 ++--------- 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 1e27cb0fc..0ee026789 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -31,12 +31,8 @@ use relative_path::RelativePath; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; +use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::option::validation; -use crate::{ - metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}, - option::{Mode, CONFIG}, - utils::get_address, -}; use super::{ LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, STREAM_METADATA_FILE_NAME, @@ -307,19 +303,8 @@ async fn dir_with_stream( if entry.file_type().await?.is_dir() { let path = entry.path(); - let stream_json_path = match &CONFIG.parseable.mode { - Mode::Ingest => { - let (ip, port) = get_address(); - let file_name = format!( - "ingester.{}.{}{}", - &ip.to_string(), - &port.to_string(), - STREAM_METADATA_FILE_NAME - ); - path.join(file_name) - } - Mode::Query | Mode::All => path.join(STREAM_METADATA_FILE_NAME), - }; + // even in ingest mode, we should only look for the global stream metadata file + let stream_json_path = path.join(STREAM_METADATA_FILE_NAME); if stream_json_path.exists() { Ok(Some(dir_name)) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index c73c4598e..91898a4c2 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -39,9 +39,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; -use crate::utils::get_address; use super::metrics_layer::MetricLayer; use super::{ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, STREAM_METADATA_FILE_NAME}; @@ -307,13 +305,8 @@ impl S3 { let stream_json_check = FuturesUnordered::new(); - let file_name = match &CONFIG.parseable.mode { - Mode::Ingest => { - let (ip, port) = get_address(); - format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME) - } - Mode::All | Mode::Query => STREAM_METADATA_FILE_NAME.to_string(), - }; + // even in ingest mode, we should only look for the global stream metadata file + let file_name = STREAM_METADATA_FILE_NAME.to_string(); for dir in &dirs { let key = format!("{}/{}", dir, file_name); From 559ecf508b3f4d8ebbb26d7a3c363ec3969c7781 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 6 Mar 2024 22:19:01 +0530 Subject: [PATCH 7/8] fix: Ingest Server Sync Stream on Startup On Startup of a new Ingest Server, Streams were not being synced up properly --- server/src/handlers/http/modal/query_server.rs | 2 -- server/src/storage/object_storage.rs | 11 ++++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index e20adde3d..7ed8f496c 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -58,8 +58,6 @@ impl ParseableServer for QueryServer { // on subsequent runs, the qurier should check if the ingester is up and running or not for ingester in data.iter() { - // dbg!(&ingester); - if !Self::check_liveness(&ingester.domain_name).await { eprintln!("Ingester at {} is not reachable", &ingester.domain_name); } else { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 3fdc90204..a0ca362f7 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -199,7 +199,16 @@ pub trait ObjectStorage: Sync + 'static { &self, stream_name: &str, ) -> Result { - let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; + let stream_metadata = match self.get_object(&stream_json_path(stream_name)).await { + Ok(data) => data, + Err(_) => { + // ! this is hard coded for now + let bytes = self.get_object(&RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME])).await?; + self.put_stream_manifest(stream_name, &serde_json::from_slice::(&bytes).expect("parseable config is valid json")).await?; + bytes + }, + }; + Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json")) } From 6767b9b26773561d6b1fe6c2562838ea22d5b176 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 8 Mar 2024 17:07:38 +0530 Subject: [PATCH 8/8] fix: add /query endpoint on Ingest Server --- server/src/handlers/http/modal/ingest_server.rs | 1 + server/src/storage/object_storage.rs | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 2a503b3fd..c028b0c5c 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -120,6 +120,7 @@ impl IngestServer { .service( // Base path "{url}/api/v1" web::scope(&base_path()) + .service(Server::get_query_factory()) .service(Server::get_ingest_factory()) .service(Self::logstream_api()), ) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index a0ca362f7..32f5102a6 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -203,10 +203,20 @@ pub trait ObjectStorage: Sync + 'static { Ok(data) => data, Err(_) => { // ! this is hard coded for now - let bytes = self.get_object(&RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME])).await?; - self.put_stream_manifest(stream_name, &serde_json::from_slice::(&bytes).expect("parseable config is valid json")).await?; + let bytes = self + .get_object(&RelativePathBuf::from_iter([ + stream_name, + STREAM_METADATA_FILE_NAME, + ])) + .await?; + self.put_stream_manifest( + stream_name, + &serde_json::from_slice::(&bytes) + .expect("parseable config is valid json"), + ) + .await?; bytes - }, + } }; Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"))