From c96a09b8fe45e0c7cae45f627f5483d686acb092 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Sat, 6 Apr 2024 18:45:21 +0530 Subject: [PATCH 1/5] feat: cli update --- server/src/option.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 43bed851d..dd92eee2f 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -42,7 +42,15 @@ pub struct Config { impl Config { fn new() -> Self { - let cli = create_parseable_cli_command().get_matches(); + let cli = create_parseable_cli_command() + .name("Parseable") + .about("A Cloud Native, log analytics platform") + .before_help("Log Lake for the cloud-native world") + .arg_required_else_help(true) + .subcommand_required(true) + .color(clap::ColorChoice::Always) + .get_matches(); + match cli.subcommand() { Some(("local-store", m)) => { let cli = match Cli::from_arg_matches(m) { @@ -181,7 +189,8 @@ fn create_parseable_cli_command() -> Command { .next_line_help(false) .help_template( r#" -{about} Join the community at https://logg.ing/community. +{about} +Join the community at https://logg.ing/community. {all-args} "#, From 9082d6fcac31b84a73f3c8681b1a5a007d4d7e3d Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 8 Apr 2024 12:20:38 +0530 Subject: [PATCH 2/5] Refactor object storage to use filter_func instead of starts_with_pattern in get_objects method --- server/src/handlers/http/cluster/mod.rs | 10 ++++++++-- server/src/handlers/http/query.rs | 1 + server/src/storage/localfs.rs | 11 ++++++----- server/src/storage/object_storage.rs | 2 +- server/src/storage/s3.rs | 9 ++------- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index f049a9c9f..347e12c43 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -117,7 +117,10 @@ pub async fn fetch_stats_from_ingesters( let obs = CONFIG .storage() .get_object_store() - .get_objects(Some(&path), ".ingester") + .get_objects( + Some(&path), + Box::new(|file_name| file_name.starts_with(".ingester")), + ) .await?; let mut ingestion_size = 0u64; let mut storage_size = 0u64; @@ -346,7 +349,10 @@ pub async fn get_ingester_info() -> anyhow::Result { let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); let arr = store - .get_objects(Some(&root_path), "ingester") + .get_objects( + Some(&root_path), + Box::new(|file_name| file_name.starts_with("ingester")), + ) .await? .iter() // this unwrap will most definateley shoot me in the foot later diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 6df8620db..456b6ab3f 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -74,6 +74,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, - _starts_with_pattern: &str, + filter_func: Box<(dyn Fn(String) -> bool + std::marker::Send + 'static)>, ) -> Result, ObjectStorageError> { let time = Instant::now(); @@ -206,13 +206,14 @@ impl ObjectStorage for LocalFS { let mut entries = fs::read_dir(&prefix).await?; let mut res = Vec::new(); while let Some(entry) = entries.next_entry().await? { - let ingester_file = entry + let path = entry .path() .file_name() - .unwrap_or_default() + .unwrap() .to_str() - .unwrap_or_default() - .contains("ingester"); + .unwrap() + .to_owned(); + let ingester_file = filter_func(path); if !ingester_file { continue; diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 54c12c858..9efa477a2 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -69,7 +69,7 @@ pub trait ObjectStorage: Sync + 'static { async fn get_objects( &self, base_path: Option<&RelativePath>, - starts_with_pattern: &str, + filter_fun: Box bool + Send>, ) -> Result, ObjectStorageError>; async fn put_object( &self, diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 6ee1f9905..e3a5f81fb 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -412,11 +412,10 @@ impl ObjectStorage for S3 { Ok(self._get_object(path).await?) } - // TBD is this the right way or the api calls are too many? async fn get_objects( &self, base_path: Option<&RelativePath>, - starts_with_pattern: &str, + filter_func: Box bool + Send>, ) -> Result, ObjectStorageError> { let instant = Instant::now(); @@ -431,11 +430,7 @@ impl ObjectStorage for S3 { let mut res = vec![]; while let Some(meta) = list_stream.next().await.transpose()? { - let ingester_file = meta - .location - .filename() - .unwrap() - .starts_with(starts_with_pattern); + let ingester_file = filter_func(meta.location.filename().unwrap().to_string()); if !ingester_file { continue; From 3e74c1a8aa631a0894bc4143a7a183432b219b49 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 8 Apr 2024 12:22:24 +0530 Subject: [PATCH 3/5] Refactor fetch_schema method to use object storage instead of HTTP requests --- server/src/handlers/http.rs | 48 +++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index ce4acaad0..a3b3f4fc7 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -18,8 +18,11 @@ use actix_cors::Cors; use arrow_schema::Schema; +use itertools::Itertools; use serde_json::Value; +use crate::option::CONFIG; + use self::{cluster::get_ingester_info, query::Query}; pub(crate) mod about; @@ -61,32 +64,31 @@ pub fn base_path_without_preceding_slash() -> String { format!("{API_BASE_PATH}/{API_VERSION}") } +/// Fetches the schema for the specified stream. +/// +/// # Arguments +/// +/// * `stream_name` - The name of the stream to fetch the schema for. +/// +/// # Returns +/// +/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream. pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { - let mut res = vec![]; - let ima = get_ingester_info().await.unwrap(); - - for im in ima { - let uri = format!( - "{}{}/logstream/{}/schema", - im.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - let reqw = reqwest::Client::new() - .get(uri) - .header(http::header::AUTHORIZATION, im.token.clone()) - .header(http::header::CONTENT_TYPE, "application/json") - .send() - .await?; - - if reqw.status().is_success() { - let v = serde_json::from_slice(&reqw.bytes().await?)?; - res.push(v); - } - } + let path_prefix = + relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, ".stream")); + let store = CONFIG.storage().get_object_store(); + let res: Vec = store + .get_objects( + Some(&path_prefix), + Box::new(|file_name: String| file_name.contains(".schema")), + ) + .await? + .iter() + // we should be able to unwrap as we know the data is valid schema + .map(|byte_obj| serde_json::from_slice(byte_obj).unwrap()) + .collect_vec(); let new_schema = Schema::try_merge(res)?; - Ok(new_schema) } From 4f7f8119418e257aa7c8161ba3d34d52fc1539a7 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 8 Apr 2024 13:17:35 +0530 Subject: [PATCH 4/5] Refactor metadata.rs and storage.rs --- server/src/metadata.rs | 61 ++++++++++++++++++++++++------------------ server/src/storage.rs | 2 +- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 7bb38e624..8fdb7597d 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -26,7 +26,7 @@ use std::sync::{Arc, RwLock}; use crate::alerts::Alerts; use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE}; -use crate::storage::{ObjectStorage, StorageDir}; +use crate::storage::{LogStream, ObjectStorage, StorageDir}; use crate::utils::arrow::MergedRecordReader; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; @@ -208,32 +208,41 @@ impl StreamInfo { // return error in case of an error from object storage itself. for stream in storage.list_streams().await? { - let alerts = storage.get_alerts(&stream.name).await?; - let schema = storage.get_schema_on_server_start(&stream.name).await?; - let meta = storage.get_stream_metadata(&stream.name).await?; - - let schema = update_schema_from_staging(&stream.name, schema); - let schema = HashMap::from_iter( - schema - .fields - .iter() - .map(|v| (v.name().to_owned(), v.clone())), - ); - - let metadata = LogStreamMetadata { - schema, - alerts, - cache_enabled: meta.cache_enabled, - created_at: meta.created_at, - first_event_at: meta.first_event_at, - time_partition: meta.time_partition, - static_schema_flag: meta.static_schema_flag, - }; - - let mut map = self.write().expect(LOCK_EXPECT); - - map.insert(stream.name, metadata); + self.upsert_stream_info(storage, stream).await?; } + Ok(()) + } + + pub async fn upsert_stream_info( + &self, + storage: &(impl ObjectStorage + ?Sized), + stream: LogStream, + ) -> Result<(), LoadError> { + let alerts = storage.get_alerts(&stream.name).await?; + let schema = storage.get_schema_on_server_start(&stream.name).await?; + let meta = storage.get_stream_metadata(&stream.name).await?; + + let schema = update_schema_from_staging(&stream.name, schema); + let schema = HashMap::from_iter( + schema + .fields + .iter() + .map(|v| (v.name().to_owned(), v.clone())), + ); + + let metadata = LogStreamMetadata { + schema, + alerts, + cache_enabled: meta.cache_enabled, + created_at: meta.created_at, + first_event_at: meta.first_event_at, + time_partition: meta.time_partition, + static_schema_flag: meta.static_schema_flag, + }; + + let mut map = self.write().expect(LOCK_EXPECT); + + map.insert(stream.name, metadata); Ok(()) } diff --git a/server/src/storage.rs b/server/src/storage.rs index b1f619029..b8fb3ba83 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -167,7 +167,7 @@ impl ObjectStoreFormat { } } -#[derive(serde::Serialize)] +#[derive(serde::Serialize, PartialEq)] pub struct LogStream { pub name: String, } From 9f4489ebbe6234551399dea32fc7d5962b339f4c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 8 Apr 2024 13:20:44 +0530 Subject: [PATCH 5/5] refactor ingest logic fetch stream info from store if stream info is not present in memory. error if stream info does not exist in s3 and memory --- server/src/handlers/http/cluster/mod.rs | 3 ++ server/src/handlers/http/ingest.rs | 30 +++++++++++++++---- server/src/handlers/http/logstream.rs | 5 +--- .../src/handlers/http/modal/ingest_server.rs | 5 +++- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 347e12c43..5bc5f8719 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -49,6 +49,7 @@ use super::base_path_without_preceding_slash; use super::modal::IngesterMetadata; // forward the request to all ingesters to keep them in sync +#[allow(dead_code)] pub async fn sync_streams_with_ingesters( stream_name: &str, time_partition: &str, @@ -143,6 +144,7 @@ pub async fn fetch_stats_from_ingesters( Ok(vec![qs]) } +#[allow(dead_code)] async fn send_stream_sync_request( url: &str, ingester: IngesterMetadata, @@ -186,6 +188,7 @@ async fn send_stream_sync_request( } /// send a rollback request to all ingesters +#[allow(dead_code)] async fn send_stream_rollback_request( url: &str, ingester: IngesterMetadata, diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 59b0c9f4a..673d14c4c 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -27,9 +27,9 @@ use crate::handlers::{ LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, }; -use crate::metadata::STREAM_INFO; +use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; -use crate::storage::ObjectStorageError; +use crate::storage::{LogStream, ObjectStorageError}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::{Field, Schema}; @@ -165,10 +165,28 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr .await?; } Mode::Ingest => { - return Err(PostError::Invalid(anyhow::anyhow!( - "Stream {} not found. Has it been created?", - stream_name - ))); + // here the ingest server has not found the stream + // so it should check if the stream exists in storage + let store = CONFIG.storage().get_object_store(); + let streams = store.list_streams().await?; + if !streams.contains(&LogStream { + name: stream_name.to_owned(), + }) { + log::error!("Stream {} not found", stream_name); + return Err(PostError::Invalid(anyhow::anyhow!( + "Stream {} not found. Has it been created?", + stream_name + ))); + } + metadata::STREAM_INFO + .upsert_stream_info( + &*store, + LogStream { + name: stream_name.to_owned(), + }, + ) + .await + .map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?; } } Ok(()) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 217e3741c..1fa728d81 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -26,8 +26,8 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; +use super::cluster::fetch_stats_from_ingesters; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; -use super::cluster::{fetch_stats_from_ingesters, sync_streams_with_ingesters}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; @@ -166,9 +166,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result