diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index ea69b0b59..dc301c14d 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -32,9 +32,6 @@ pub(crate) mod query; pub(crate) mod rbac; pub(crate) mod role; -// this needs to be removed from here. It is in modal->mod.rs -// include!(concat!(env!("OUT_DIR"), "/generated.rs")); - pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "/api"; pub const API_VERSION: &str = "v1"; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2a3281843..a5ed7978f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -32,6 +32,7 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; +use crate::option::{Mode, CONFIG}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use super::logstream::error::CreateStreamError; @@ -140,7 +141,18 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string()).await?; + + match &CONFIG.parseable.mode { + Mode::All | Mode::Query => { + super::logstream::create_stream(stream_name.to_string()).await?; + } + Mode::Ingest => { + return Err(PostError::Invalid(anyhow::anyhow!( + "Stream {} not found. Has it been created?", + stream_name + ))); + } + } Ok(()) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index a2fc93cc9..53384ee96 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -25,7 +25,7 @@ use serde_json::Value; use crate::alerts::Alerts; use crate::metadata::STREAM_INFO; -use crate::option::CONFIG; +use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; use crate::{catalog, event, stats}; @@ -33,6 +33,8 @@ use crate::{metadata, validator}; use self::error::{CreateStreamError, StreamError}; +use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats}; + pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -111,7 +113,6 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if metadata::STREAM_INFO.stream_exists(&stream_name) { // Error if the log stream already exists return Err(StreamError::Custom { @@ -121,7 +122,11 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } - create_stream(stream_name).await?; + if CONFIG.parseable.mode == Mode::Query { + query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?; + } + + create_stream(stream_name.clone()).await?; Ok(("log stream created", StatusCode::OK)) } @@ -279,30 +284,62 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + if CONFIG.parseable.mode == Mode::Query { + let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?; + let stats = serde_json::to_value(stats).unwrap(); + return Ok((web::Json(stats), StatusCode::OK)); + } + let hash_map = STREAM_INFO.read().unwrap(); let stream_meta = &hash_map .get(&stream_name) .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; let time = Utc::now(); + let qstats = match &stream_meta.first_event_at { + Some(first_event_at) => { + let ingestion_stats = IngestionStats::new( + stats.events, + format!("{} {}", stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = + StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + + QueriedStats::new( + &stream_name, + &stream_meta.created_at, + Some(first_event_at.to_owned()), + time, + ingestion_stats, + storage_stats, + ) + } - let stats = serde_json::json!({ - "stream": stream_name, - "creation_time": &stream_meta.created_at, - "first_event_at": Some(&stream_meta.first_event_at), - "time": time, - "ingestion": { - "count": stats.events, - "size": format!("{} {}", stats.ingestion, "Bytes"), - "format": "json" - }, - "storage": { - "size": format!("{} {}", stats.storage, "Bytes"), - "format": "parquet" + // ? this case should not happen + None => { + let ingestion_stats = IngestionStats::new( + stats.events, + format!("{} {}", stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = + StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + + QueriedStats::new( + &stream_name, + &stream_meta.created_at, + Some('0'.to_string()), + time, + ingestion_stats, + storage_stats, + ) } - }); + }; + + let out_stats = serde_json::to_value(qstats).unwrap(); - Ok((web::Json(stats), StatusCode::OK)) + Ok((web::Json(out_stats), StatusCode::OK)) } // Check if the first_event_at is empty @@ -345,7 +382,6 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> let created_at = stream_meta.unwrap().created_at; metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); - Ok(()) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index cfe1cbf23..c028b0c5c 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -20,6 +20,7 @@ use crate::analytics; use crate::banner; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; +use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; @@ -30,8 +31,6 @@ use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_METADATA_FILE_NAME; use crate::sync; -use std::net::SocketAddr; - use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::IngesterMetadata; @@ -63,8 +62,8 @@ impl ParseableServer for IngestServer { prometheus: PrometheusMetrics, _oidc_client: Option, ) -> anyhow::Result<()> { - // set the ingestor metadata - self.set_ingestor_metadata().await?; + // set the ingester metadata + self.set_ingester_metadata().await?; // get the ssl stuff let ssl = get_ssl_acceptor( @@ -99,11 +98,9 @@ impl ParseableServer for IngestServer { /// implement the init method will just invoke the initialize method async fn init(&self) -> anyhow::Result<()> { - // self.validate()?; self.initialize().await } - #[allow(unused)] fn validate(&self) -> anyhow::Result<()> { if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::Error::msg( @@ -122,7 +119,10 @@ impl IngestServer { config .service( // Base path "{url}/api/v1" - web::scope(&base_path()).service(Server::get_ingest_factory()), + web::scope(&base_path()) + .service(Server::get_query_factory()) + .service(Server::get_ingest_factory()) + .service(Self::logstream_api()), ) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) @@ -150,32 +150,60 @@ impl IngestServer { ) } - #[inline(always)] - fn get_ingestor_address(&self) -> SocketAddr { - // this might cause an issue down the line - // best is to make the Cli Struct better, but thats a chore - (CONFIG.parseable.address.clone()) - .parse::() - .unwrap() + fn logstream_api() -> Scope { + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), + ) + .service( + web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ), + ) } - // create the ingestor metadata and put the .ingestor.json file in the object store - async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { + // create the ingester metadata and put the .ingester.json file in the object store + async fn set_ingester_metadata(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); // remove ip adn go with the domain name - let sock = self.get_ingestor_address(); + let sock = Server::get_server_address(); let path = RelativePathBuf::from(format!( - "ingestor.{}.{}.json", + "ingester.{}.{}.json", sock.ip(), // this might be wrong sock.port() )); if store.get_object(&path).await.is_ok() { - println!("Ingestor metadata already exists"); + println!("Ingester metadata already exists"); return Ok(()); }; + let scheme = CONFIG.parseable.get_scheme(); let resource = IngesterMetadata::new( sock.port().to_string(), CONFIG @@ -183,13 +211,13 @@ impl IngestServer { .domain_address .clone() .unwrap_or_else(|| { - Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap() + Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap() }) .to_string(), DEFAULT_VERSION.to_string(), store.get_bucket_name(), &CONFIG.parseable.username, - &CONFIG.parseable.password, // is this secure? + &CONFIG.parseable.password, ); let resource = serde_json::to_string(&resource) @@ -203,7 +231,7 @@ impl IngestServer { } // check for querier state. Is it there, or was it there in the past - // this should happen before the set the ingestor metadata + // this should happen before the set the ingester metadata async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> { // how do we check for querier state? // based on the work flow of the system, the querier will always need to start first diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index acd5c7579..7ed8f496c 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::handlers::http::logstream::error::StreamError; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::http::header; @@ -23,8 +24,12 @@ use actix_web::web; use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use http::StatusCode; use itertools::Itertools; use relative_path::RelativePathBuf; +use reqwest::Response; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::io::AsyncWriteExt; use url::Url; @@ -49,24 +54,14 @@ impl ParseableServer for QueryServer { prometheus: actix_web_prometheus::PrometheusMetrics, oidc_client: Option, ) -> anyhow::Result<()> { - let data = Self::get_ingestor_info().await?; + let data = Self::get_ingester_info().await?; - // on subsequent runs, the qurier should check if the ingestor is up and running or not + // on subsequent runs, the qurier should check if the ingester is up and running or not for ingester in data.iter() { - // dbg!(&ingester); - // yes the format macro does not need the '/' ingester.origin already - // has '/' because Url::Parse will add it if it is not present - // uri should be something like `http://address/api/v1/liveness` - let uri = Url::parse(&format!( - "{}{}/liveness", - &ingester.domain_name, - base_path() - ))?; - - if !Self::check_liveness(uri).await { - eprintln!("Ingestor at {} is not reachable", &ingester.domain_name); + if !Self::check_liveness(&ingester.domain_name).await { + eprintln!("Ingester at {} is not reachable", &ingester.domain_name); } else { - println!("Ingestor at {} is up and running", &ingester.domain_name); + println!("Ingester at {} is up and running", &ingester.domain_name); } } @@ -111,11 +106,9 @@ impl ParseableServer for QueryServer { /// implementation of init should just invoke a call to initialize async fn init(&self) -> anyhow::Result<()> { - // self.validate()?; self.initialize().await } - #[allow(unused)] fn validate(&self) -> anyhow::Result<()> { if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::anyhow!( @@ -148,7 +141,7 @@ impl QueryServer { } // update the .query.json file and return the new IngesterMetadataArr - async fn get_ingestor_info() -> anyhow::Result { + pub async fn get_ingester_info() -> anyhow::Result { let store = CONFIG.storage().get_object_store(); let root_path = RelativePathBuf::from(""); @@ -169,7 +162,9 @@ impl QueryServer { Ok(arr) } - pub async fn check_liveness(uri: Url) -> bool { + pub async fn check_liveness(domain_name: &str) -> bool { + let uri = Url::parse(&format!("{}{}/liveness", domain_name, base_path())).unwrap(); + let reqw = reqwest::Client::new() .get(uri) .header(header::CONTENT_TYPE, "application/json") @@ -216,7 +211,7 @@ impl QueryServer { } // spawn the sync thread - // tokio::spawn(Self::sync_ingestor_metadata()); + // tokio::spawn(Self::sync_ingester_metadata()); self.start(prometheus, CONFIG.parseable.openid.clone()) .await?; @@ -225,12 +220,12 @@ impl QueryServer { } #[allow(dead_code)] - async fn sync_ingestor_metadata() { + async fn sync_ingester_metadata() { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60 / 10)); loop { interval.tick().await; // dbg!("Tick"); - Self::get_ingestor_info().await.unwrap(); + Self::get_ingester_info().await.unwrap(); } } @@ -244,4 +239,379 @@ impl QueryServer { .await .unwrap() } + + // forward the request to all ingesters to keep them in sync + pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> { + let ingester_infos = Self::get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + let mut errored = false; + for ingester in ingester_infos.iter() { + let url = format!( + "{}{}/logstream/{}", + ingester.domain_name.to_string().trim_end_matches('/'), + base_path(), + stream_name + ); + + match Self::send_stream_sync_request(&url, ingester.clone()).await { + Ok(_) => continue, + Err(_) => { + errored = true; + break; + } + } + } + + if errored { + for ingester in ingester_infos { + let url = format!( + "{}{}/logstream/{}", + ingester.domain_name.to_string().trim_end_matches('/'), + base_path(), + stream_name + ); + + Self::send_stream_rollback_request(&url, ingester.clone()).await?; + } + + // this might be a bit too much + return Err(StreamError::Custom { + msg: "Failed to sync stream with ingesters".to_string(), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) + } + + pub async fn fetch_stats_from_ingesters( + stream_name: &str, + ) -> Result { + let mut stats = Vec::new(); + + let ingester_infos = Self::get_ingester_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingester info: {:?}", err); + StreamError::Custom { + msg: format!("failed to get ingester info\n{:?}", err), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + for ingester in ingester_infos { + let url = format!( + "{}{}/logstream/{}/stats", + ingester.domain_name.to_string().trim_end_matches('/'), + base_path(), + stream_name + ); + + match Self::send_stats_request(&url, ingester.clone()).await { + Ok(Some(res)) => { + match serde_json::from_str::(&res.text().await.unwrap()) { + Ok(stat) => stats.push(stat), + Err(err) => { + log::error!( + "Could not parse stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + continue; + } + } + } + Ok(None) => { + log::error!("Ingester at {} is not reachable", &ingester.domain_name); + continue; + } + Err(err) => { + log::error!( + "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + return Err(err); + } + } + } + let stats = Self::merge_quried_stats(stats); + + Ok(stats) + } + + async fn send_stats_request( + url: &str, + ingester: IngesterMetadata, + ) -> Result, StreamError> { + if !Self::check_liveness(&ingester.domain_name).await { + return Ok(None); + } + + let client = reqwest::Client::new(); + let res = client + .get(url) + .header("Content-Type", "application/json") + .header("Authorization", ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + + StreamError::Custom { + msg: format!( + "failed to fetch stats from ingester: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res + ); + return Err(StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name,res.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(Some(res)) + } + + async fn send_stream_sync_request( + url: &str, + ingester: IngesterMetadata, + ) -> Result<(), StreamError> { + if !Self::check_liveness(&ingester.domain_name).await { + return Ok(()); + } + + let client = reqwest::Client::new(); + let res = client + .put(url) + .header("Content-Type", "application/json") + .header("Authorization", ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward create stream request to ingester: {}\n Error: {:?}", + ingester.domain_name, + err + ); + StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name, + res + ); + return Err(StreamError::Custom { + msg: format!( + "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", + ingester.domain_name,res.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) + } + + async fn send_stream_rollback_request( + url: &str, + ingester: IngesterMetadata, + ) -> Result<(), StreamError> { + if !Self::check_liveness(&ingester.domain_name).await { + return Ok(()); + } + + let client = reqwest::Client::new(); + let resp = client + .delete(url) + .header("Content-Type", "application/json") + .header("Authorization", ingester.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to rollback stream creation: {}\n Error: {:?}", + ingester.domain_name, + err + ); + StreamError::Custom { + msg: format!( + "failed to rollback stream creation: {}\n Error: {:?}", + ingester.domain_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + if !resp.status().is_success() { + log::error!( + "failed to rollback stream creation: {}\nResponse Returned: {:?}", + ingester.domain_name, + resp + ); + return Err(StreamError::Custom { + msg: format!( + "failed to rollback stream creation: {}\nResponse Returned: {:?}", + ingester.domain_name, + resp.text().await.unwrap_or_default() + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } + + Ok(()) + } + + pub fn merge_quried_stats(stats: Vec) -> QueriedStats { + let min_creation_time = stats + .iter() + .map(|x| x.creation_time.parse::>().unwrap()) + .min() + .unwrap_or_default(); + let stream_name = stats[0].stream.clone(); + let min_first_event_at = stats + .iter() + .map(|x| match x.first_event_at.as_ref() { + Some(fea) => fea.parse::>().unwrap_or_default(), + None => Utc::now(), + }) + .min() + .unwrap_or_else(Utc::now); + + let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); + + let cumulative_ingestion = + stats + .iter() + .map(|x| &x.ingestion) + .fold(IngestionStats::default(), |acc, x| IngestionStats { + count: acc.count + x.count, + size: format!( + "{}", + acc.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + format: x.format.clone(), + }); + + let cumulative_storage = + stats + .iter() + .map(|x| &x.storage) + .fold(StorageStats::default(), |acc, x| StorageStats { + size: format!( + "{}", + acc.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + format: x.format.clone(), + }); + + QueriedStats::new( + &stream_name, + &min_creation_time.to_string(), + Some(min_first_event_at.to_string()), + min_time, + cumulative_ingestion, + cumulative_storage, + ) + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct QueriedStats { + pub stream: String, + pub creation_time: String, + pub first_event_at: Option, + pub time: DateTime, + pub ingestion: IngestionStats, + pub storage: StorageStats, +} + +impl QueriedStats { + pub fn new( + stream: &str, + creation_time: &str, + first_event_at: Option, + time: DateTime, + ingestion: IngestionStats, + storage: StorageStats, + ) -> Self { + Self { + stream: stream.to_string(), + creation_time: creation_time.to_string(), + first_event_at, + time, + ingestion, + storage, + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct IngestionStats { + pub count: u64, + pub size: String, + pub format: String, +} + +impl IngestionStats { + pub fn new(count: u64, size: String, format: &str) -> Self { + Self { + count, + size, + format: format.to_string(), + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StorageStats { + size: String, + format: String, +} + +impl StorageStats { + pub fn new(size: String, format: &str) -> Self { + Self { + size, + format: format.to_string(), + } + } } diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 84b9d8e90..005a38614 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -32,6 +32,7 @@ use crate::migration; use crate::rbac; use crate::storage; use crate::sync; +use std::net::SocketAddr; use std::{fs::File, io::BufReader, sync::Arc}; use actix_web::web::resource; @@ -464,4 +465,13 @@ impl Server { }; } } + + #[inline(always)] + pub fn get_server_address() -> SocketAddr { + // this might cause an issue down the line + // best is to make the Cli Struct better, but thats a chore + (CONFIG.parseable.address.clone()) + .parse::() + .unwrap() + } } diff --git a/server/src/main.rs b/server/src/main.rs index 60bca9fcf..d2991294a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -67,11 +67,8 @@ async fn main() -> anyhow::Result<()> { Mode::All => Arc::new(Server), }; - // add logic for graceful shutdown if // MODE == Query / Ingest and storage = local-store - // option.rs ln: 161 - // CONFIG.run_time_mode_validation()?; - + server.validate()?; server.init().await?; Ok(()) diff --git a/server/src/migration.rs b/server/src/migration.rs index eb3b98c0e..e5958fd29 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -30,8 +30,8 @@ use serde::Serialize; use crate::{ option::Config, storage::{ - ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, + object_storage::stream_json_path, ObjectStorage, ObjectStorageError, + PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, }, }; @@ -94,7 +94,8 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { } async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { - let path = RelativePathBuf::from_iter([stream, STREAM_METADATA_FILE_NAME]); + let path = stream_json_path(stream); + let stream_metadata = storage.get_object(&path).await?; let stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); diff --git a/server/src/option.rs b/server/src/option.rs index 99bff7e66..e24e5e7e5 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -157,18 +157,6 @@ impl Config { } "S3 bucket" } - - #[allow(dead_code)] - pub fn run_time_mode_validation(&self) -> anyhow::Result<()> { - let check = (self.parseable.mode == Mode::Ingest || self.parseable.mode == Mode::Query) - && self.storage_name == "drive"; - - if check { - anyhow::bail!(format!("Cannot start the server in {} mode with local storage, please use S3 bucket for storage", self.parseable.mode.to_str())) - } - - Ok(()) - } } fn create_parseable_cli_command() -> Command { diff --git a/server/src/storage.rs b/server/src/storage.rs index ed27ec7c8..f1a7e86b9 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -24,7 +24,7 @@ use std::fmt::Debug; mod localfs; mod metrics_layer; -mod object_storage; +pub(crate) mod object_storage; pub mod retention; mod s3; pub mod staging; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index bce837d58..0ee026789 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -128,15 +128,15 @@ impl ObjectStorage for LocalFS { let mut entries = fs::read_dir(&prefix).await?; let mut res = Vec::new(); while let Some(entry) = entries.next_entry().await? { - let ingestor_file = entry + let ingester_file = entry .path() .file_name() .unwrap_or_default() .to_str() .unwrap_or_default() - .contains("ingestor"); + .contains("ingester"); - if !ingestor_file { + if !ingester_file { continue; } @@ -302,7 +302,10 @@ async fn dir_with_stream( if entry.file_type().await?.is_dir() { let path = entry.path(); + + // even in ingest mode, we should only look for the global stream metadata file let stream_json_path = path.join(STREAM_METADATA_FILE_NAME); + if stream_json_path.exists() { Ok(Some(dir_name)) } else { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 53c6779e7..32f5102a6 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -25,6 +25,7 @@ use super::{ STREAM_METADATA_FILE_NAME, }; +use crate::option::Mode; use crate::utils::get_address; use crate::{ alerts::Alerts, @@ -198,7 +199,26 @@ pub trait ObjectStorage: Sync + 'static { &self, stream_name: &str, ) -> Result { - let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; + let stream_metadata = match self.get_object(&stream_json_path(stream_name)).await { + Ok(data) => data, + Err(_) => { + // ! this is hard coded for now + let bytes = self + .get_object(&RelativePathBuf::from_iter([ + stream_name, + STREAM_METADATA_FILE_NAME, + ])) + .await?; + self.put_stream_manifest( + stream_name, + &serde_json::from_slice::(&bytes) + .expect("parseable config is valid json"), + ) + .await?; + bytes + } + }; + Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json")) } @@ -435,8 +455,17 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { } #[inline(always)] -fn stream_json_path(stream_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]) +pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { + match &CONFIG.parseable.mode { + Mode::Ingest => { + let (ip, port) = get_address(); + let file_name = format!("ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME); + RelativePathBuf::from_iter([stream_name, &file_name]) + } + Mode::Query | Mode::All => { + RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]) + } + } } #[inline(always)] diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index edf342f9b..91898a4c2 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -305,8 +305,11 @@ impl S3 { let stream_json_check = FuturesUnordered::new(); + // even in ingest mode, we should only look for the global stream metadata file + let file_name = STREAM_METADATA_FILE_NAME.to_string(); + for dir in &dirs { - let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); + let key = format!("{}/{}", dir, file_name); let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } @@ -423,13 +426,13 @@ impl ObjectStorage for S3 { let mut res = vec![]; while let Some(meta) = list_stream.next().await.transpose()? { - let ingestor_file = meta + let ingester_file = meta .location .filename() .unwrap_or_default() - .contains("ingestor"); + .contains("ingester"); - if !ingestor_file { + if !ingester_file { continue; }