diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index e438ab8f4..da5908dbc 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -30,7 +30,6 @@ use crate::metrics::prom_utils::Metrics; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; use crate::stats::Stats; -use crate::storage::get_staging_metadata; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; @@ -65,7 +64,6 @@ pub async fn sync_streams_with_ingestors( headers: HeaderMap, body: Bytes, stream_name: &str, - skip_ingestor: Option, ) -> Result<(), StreamError> { let mut reqwest_headers = http_header::HeaderMap::new(); @@ -79,15 +77,7 @@ pub async fn sync_streams_with_ingestors( let client = reqwest::Client::new(); - let final_ingestor_infos = match skip_ingestor { - None => ingestor_infos, - Some(skip_ingestor) => ingestor_infos - .into_iter() - .filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone())) - .collect::>(), - }; - - for ingestor in final_ingestor_infos { + for ingestor in ingestor_infos { if !utils::check_liveness(&ingestor.domain_name).await { log::warn!("Ingestor {} is not live", ingestor.domain_name); continue; @@ -852,62 +842,3 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { Ok(()) } - -pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> { - let client = reqwest::Client::new(); - - let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { - StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) - })?; - let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); - let token = staging_metadata.querier_auth_token.unwrap(); - - if !check_liveness(&querier_endpoint).await { - log::warn!("Querier {} is not live", querier_endpoint); - return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live"))); - } - - let url = format!( - "{}{}/logstream/{}?skip_ingestors={}", - querier_endpoint, - base_path_without_preceding_slash(), - stream_name, - CONFIG.parseable.ingestor_endpoint, - ); - - let response = client - .put(&url) - .header(header::AUTHORIZATION, &token) - .send() - .await - .map_err(|err| { - log::error!( - "Fatal: failed to forward create stream request to querier: {}\n Error: {:?}", - &url, - err - ); - StreamError::Network(err) - })?; - - let status = response.status(); - - if !status.is_success() { - let response_text = response.text().await.map_err(|err| { - log::error!("Failed to read response text from querier: {}", &url); - StreamError::Network(err) - })?; - - log::error!( - "Failed to forward create stream request to querier: {}\nResponse Returned: {:?}", - &url, - response_text - ); - - return Err(StreamError::Anyhow(anyhow::anyhow!( - "Request failed with status: {}", - status, - ))); - } - - Ok(()) -} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 48ec1d9dc..f94faefce 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -26,13 +26,13 @@ use crate::event::{ error::EventError, format::{self, EventFormat}, }; -use crate::handlers::http::cluster::forward_create_stream_request; +use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; -use crate::metadata::{self, STREAM_INFO}; +use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; -use crate::storage::{LogStream, ObjectStorageError, StreamType}; +use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; @@ -153,7 +153,17 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result {} + Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(PostError::StreamNotFound(stream_name.clone())); + } } flatten_and_push_logs(req, body, stream_name).await?; @@ -190,49 +200,25 @@ pub async fn create_stream_if_not_exists( stream_exists = true; return Ok(stream_exists); } - match &CONFIG.parseable.mode { - Mode::All | Mode::Query => { - super::logstream::create_stream( - stream_name.to_string(), - "", - "", - "", - "", - Arc::new(Schema::empty()), - stream_type, - ) - .await?; - } - Mode::Ingest => { - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let store = CONFIG.storage().get_object_store(); - let streams = store.list_streams().await?; - if !streams.contains(&LogStream { - name: stream_name.to_owned(), - }) { - match forward_create_stream_request(stream_name).await { - Ok(()) => log::info!("Stream {} created", stream_name), - Err(e) => { - return Err(PostError::Invalid(anyhow::anyhow!( - "Unable to create stream: {} using query server. Error: {}", - stream_name, - e.to_string(), - ))) - } - }; - } - metadata::STREAM_INFO - .upsert_stream_info( - &*store, - LogStream { - name: stream_name.to_owned(), - }, - ) - .await - .map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?; - } + + // For distributed deployments, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode != Mode::All { + return Ok(create_stream_and_schema_from_storage(stream_name).await?); } + + super::logstream::create_stream( + stream_name.to_string(), + "", + "", + "", + "", + Arc::new(Schema::empty()), + stream_type, + ) + .await?; + Ok(stream_exists) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 00ddff3f3..84c500709 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -20,17 +20,19 @@ use self::error::{CreateStreamError, StreamError}; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; -use super::modal::utils::logstream_utils::create_update_stream; +use super::modal::utils::logstream_utils::{ + create_stream_and_schema_from_storage, create_update_stream, +}; use crate::alerts::Alerts; use crate::event::format::update_data_type_to_datetime; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::CONFIG; +use crate::option::{Mode, CONFIG}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::StreamType; -use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; +use crate::storage::{retention::Retention, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; @@ -82,11 +84,13 @@ pub async fn delete(req: HttpRequest) -> Result { } pub async fn list(_: HttpRequest) -> impl Responder { - let res: Vec = STREAM_INFO + //list all streams from storage + let res = CONFIG + .storage() + .get_object_store() .list_streams() - .into_iter() - .map(|stream| LogStream { name: stream }) - .collect(); + .await + .unwrap(); web::Json(res) } @@ -113,7 +117,22 @@ pub async fn detect_schema(body: Bytes) -> Result { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let schema = STREAM_INFO.schema(&stream_name)?; + let schema = match STREAM_INFO.schema(&stream_name) { + Ok(schema) => schema, + + //if schema not found in memory map + //create stream and schema from storage and memory + //return from memory map + Err(_) if CONFIG.parseable.mode == Mode::Query => { + if create_stream_and_schema_from_storage(&stream_name).await? { + STREAM_INFO.schema(&stream_name)? + } else { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + } + Err(_) => return Err(StreamError::StreamNotFound(stream_name)), + }; + Ok((web::Json(schema), StatusCode::OK)) } @@ -180,7 +199,17 @@ pub async fn put_alert( validator::alert(&alerts)?; if !STREAM_INFO.stream_initialized(&stream_name)? { - return Err(StreamError::UninitializedLogstream); + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::UninitializedLogstream); + } } let schema = STREAM_INFO.schema(&stream_name)?; @@ -218,7 +247,17 @@ pub async fn put_alert( pub async fn get_retention(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name.to_string())); + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let retention = STREAM_INFO.get_retention(&stream_name); @@ -239,6 +278,21 @@ pub async fn put_retention( body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } + } + let body = body.into_inner(); let retention: Retention = match serde_json::from_value(body) { @@ -327,8 +381,18 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if cfg!(not(test)) && CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let query_string = req.query_string(); @@ -496,8 +560,15 @@ pub async fn create_stream( pub async fn get_stream_info(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if !STREAM_INFO.stream_exists(&stream_name) { + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let store = CONFIG.storage().get_object_store(); @@ -539,8 +610,18 @@ pub async fn put_stream_hot_tier( body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { @@ -589,8 +670,18 @@ pub async fn put_stream_hot_tier( pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } if CONFIG.parseable.hot_tier_storage_path.is_none() { @@ -614,8 +705,18 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } if CONFIG.parseable.hot_tier_storage_path.is_none() { @@ -654,7 +755,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { header::CONTENT_TYPE, HeaderValue::from_static("application/json"), ); - sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?; + sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; } Ok(()) } diff --git a/server/src/handlers/http/modal/ingest/ingester_ingest.rs b/server/src/handlers/http/modal/ingest/ingestor_ingest.rs similarity index 58% rename from server/src/handlers/http/modal/ingest/ingester_ingest.rs rename to server/src/handlers/http/modal/ingest/ingestor_ingest.rs index f7725254a..e91a27614 100644 --- a/server/src/handlers/http/modal/ingest/ingester_ingest.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_ingest.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{HttpRequest, HttpResponse}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/ingest/ingester_logstream.rs b/server/src/handlers/http/modal/ingest/ingestor_logstream.rs similarity index 68% rename from server/src/handlers/http/modal/ingest/ingester_logstream.rs rename to server/src/handlers/http/modal/ingest/ingestor_logstream.rs index f5ece7487..88ad68765 100644 --- a/server/src/handlers/http/modal/ingest/ingester_logstream.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use http::StatusCode; @@ -7,7 +25,10 @@ use crate::{ catalog::remove_manifest_from_snapshot, event, handlers::http::{ - logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream, + logstream::error::StreamError, + modal::utils::logstream_utils::{ + create_stream_and_schema_from_storage, create_update_stream, + }, }, metadata::{self, STREAM_INFO}, option::CONFIG, @@ -21,26 +42,35 @@ pub async fn retention_cleanup( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - log::error!("Stream {} not found", stream_name.clone()); + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { return Err(StreamError::StreamNotFound(stream_name.clone())); } + let date_list: Vec = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; - let mut first_event_at: Option = None; - if let Err(err) = res { - log::error!("Failed to update manifest list in the snapshot {err:?}") - } else { - first_event_at = res.unwrap(); - } + let first_event_at: Option = res.unwrap_or_default(); Ok((first_event_at, StatusCode::OK)) } pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } metadata::STREAM_INFO.delete_stream(&stream_name); diff --git a/server/src/handlers/http/modal/ingest/ingester_rbac.rs b/server/src/handlers/http/modal/ingest/ingestor_rbac.rs similarity index 83% rename from server/src/handlers/http/modal/ingest/ingester_rbac.rs rename to server/src/handlers/http/modal/ingest/ingestor_rbac.rs index 157b52959..f25abe688 100644 --- a/server/src/handlers/http/modal/ingest/ingester_rbac.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashSet; use actix_web::{web, Responder}; diff --git a/server/src/handlers/http/modal/ingest/ingester_role.rs b/server/src/handlers/http/modal/ingest/ingestor_role.rs similarity index 51% rename from server/src/handlers/http/modal/ingest/ingester_role.rs rename to server/src/handlers/http/modal/ingest/ingestor_role.rs index 0ad41e765..499157136 100644 --- a/server/src/handlers/http/modal/ingest/ingester_role.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_role.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{web, HttpResponse, Responder}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/ingest/mod.rs b/server/src/handlers/http/modal/ingest/mod.rs index 26ed76438..c6a32dff7 100644 --- a/server/src/handlers/http/modal/ingest/mod.rs +++ b/server/src/handlers/http/modal/ingest/mod.rs @@ -1,3 +1,21 @@ -pub mod ingester_logstream; -pub mod ingester_rbac; -pub mod ingester_role; +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod ingestor_logstream; +pub mod ingestor_rbac; +pub mod ingestor_role; diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index f6e54ce8e..1e0e9dd21 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -40,9 +40,9 @@ use crate::sync; use std::sync::Arc; -use super::ingest::ingester_logstream; -use super::ingest::ingester_rbac; -use super::ingest::ingester_role; +use super::ingest::ingestor_logstream; +use super::ingest::ingestor_rbac; +use super::ingest::ingestor_role; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::IngestorMetadata; @@ -251,7 +251,7 @@ impl IngestServer { ) .service( resource("/{name}/sync") - .route(web::put().to(ingester_role::put).authorize(Action::PutRole)), + .route(web::put().to(ingestor_role::put).authorize(Action::PutRole)), ) } // get the user webscope @@ -262,13 +262,13 @@ impl IngestServer { // PUT /user/{username}/sync => Sync creation of a new user .route( web::post() - .to(ingester_rbac::post_user) + .to(ingestor_rbac::post_user) .authorize(Action::PutUser), ) // DELETE /user/{username} => Sync deletion of a user .route( web::delete() - .to(ingester_rbac::delete_user) + .to(ingestor_rbac::delete_user) .authorize(Action::DeleteUser), ) .wrap(DisAllowRootUser), @@ -278,7 +278,7 @@ impl IngestServer { // PUT /user/{username}/roles => Put roles for user .route( web::put() - .to(ingester_rbac::put_role) + .to(ingestor_rbac::put_role) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), @@ -288,7 +288,7 @@ impl IngestServer { // POST /user/{username}/generate-new-password => reset password for this user .route( web::post() - .to(ingester_rbac::post_gen_password) + .to(ingestor_rbac::post_gen_password) .authorize(Action::PutUser) .wrap(DisAllowRootUser), ), @@ -311,13 +311,13 @@ impl IngestServer { // DELETE "/logstream/{logstream}/sync" ==> Sync deletion of a log stream .route( web::delete() - .to(ingester_logstream::delete) + .to(ingestor_logstream::delete) .authorize(Action::DeleteStream), ) // PUT "/logstream/{logstream}/sync" ==> Sync creation of a new log stream .route( web::put() - .to(ingester_logstream::put_stream) + .to(ingestor_logstream::put_stream) .authorize_for_stream(Action::CreateStream), ), ) @@ -342,13 +342,13 @@ impl IngestServer { // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream .route( web::put() - .to(ingester_logstream::put_enable_cache) + .to(ingestor_logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), ) // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream .route( web::get() - .to(ingester_logstream::get_cache_enabled) + .to(ingestor_logstream::get_cache_enabled) .authorize_for_stream(Action::GetCacheEnabled), ), ) @@ -356,7 +356,7 @@ impl IngestServer { web::scope("/retention").service( web::resource("/cleanup").route( web::post() - .to(ingester_logstream::retention_cleanup) + .to(ingestor_logstream::retention_cleanup) .authorize_for_stream(Action::PutRetention), ), ), diff --git a/server/src/handlers/http/modal/query/mod.rs b/server/src/handlers/http/modal/query/mod.rs index 704f9ca54..8ef11dd60 100644 --- a/server/src/handlers/http/modal/query/mod.rs +++ b/server/src/handlers/http/modal/query/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod querier_ingest; pub mod querier_logstream; pub mod querier_rbac; diff --git a/server/src/handlers/http/modal/query/querier_ingest.rs b/server/src/handlers/http/modal/query/querier_ingest.rs index 2e5e140c6..1eff3999a 100644 --- a/server/src/handlers/http/modal/query/querier_ingest.rs +++ b/server/src/handlers/http/modal/query/querier_ingest.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::handlers::http::ingest::PostError; use actix_web::{HttpRequest, HttpResponse}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index c67d173ae..86e887e3b 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use core::str; use std::fs; @@ -5,7 +23,6 @@ use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use serde::Deserialize; use tokio::sync::Mutex; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); @@ -20,7 +37,9 @@ use crate::{ utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::{error::StreamError, get_stats_date}, - modal::utils::logstream_utils::create_update_stream, + modal::utils::logstream_utils::{ + create_stream_and_schema_from_storage, create_update_stream, + }, }, hottier::HotTierManager, metadata::{self, STREAM_INFO}, @@ -31,8 +50,16 @@ use crate::{ pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } let objectstore = CONFIG.storage().get_object_store(); @@ -79,22 +106,13 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -#[derive(Deserialize)] -pub struct PutStreamQuery { - skip_ingestors: Option, -} - -pub async fn put_stream( - req: HttpRequest, - body: Bytes, - info: web::Query, -) -> Result { +pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let _ = CREATE_STREAM_LOCK.lock().await; let headers = create_update_stream(&req, &body, &stream_name).await?; - sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?; + sync_streams_with_ingestors(headers, body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } @@ -102,8 +120,15 @@ pub async fn put_stream( pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } let query_string = req.query_string(); diff --git a/server/src/handlers/http/modal/query/querier_rbac.rs b/server/src/handlers/http/modal/query/querier_rbac.rs index a5b88c33b..ae2af1c2d 100644 --- a/server/src/handlers/http/modal/query/querier_rbac.rs +++ b/server/src/handlers/http/modal/query/querier_rbac.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashSet; use actix_web::{web, Responder}; diff --git a/server/src/handlers/http/modal/query/querier_role.rs b/server/src/handlers/http/modal/query/querier_role.rs index c17489273..b9930579c 100644 --- a/server/src/handlers/http/modal/query/querier_role.rs +++ b/server/src/handlers/http/modal/query/querier_role.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{web, HttpResponse, Responder}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/utils/ingest_utils.rs b/server/src/handlers/http/modal/utils/ingest_utils.rs index 9d29d0a76..81ccfbd44 100644 --- a/server/src/handlers/http/modal/utils/ingest_utils.rs +++ b/server/src/handlers/http/modal/utils/ingest_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{ collections::{BTreeMap, HashMap}, sync::Arc, diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index 65628eca5..0081a258a 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{collections::HashMap, num::NonZeroU32, sync::Arc}; use actix_web::{http::header::HeaderMap, HttpRequest}; @@ -12,9 +30,9 @@ use crate::{ TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, metadata::{self, STREAM_INFO}, - option::CONFIG, + option::{Mode, CONFIG}, static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, - storage::StreamType, + storage::{LogStream, ObjectStoreFormat, StreamType}, validator, }; @@ -28,11 +46,11 @@ pub async fn create_update_stream( time_partition_limit, custom_partition, static_schema_flag, - update_stream, + update_stream_flag, stream_type, ) = fetch_headers_from_put_stream_request(req); - if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { + if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" { return Err(StreamError::Custom { msg: format!( "Logstream {stream_name} already exists, please create a new log stream with unique name" @@ -41,43 +59,36 @@ pub async fn create_update_stream( }); } - if update_stream == "true" { - if !STREAM_INFO.stream_exists(stream_name) { - return Err(StreamError::StreamNotFound(stream_name.to_string())); - } - if !time_partition.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the time partition of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + if !metadata::STREAM_INFO.stream_exists(stream_name) + && CONFIG.parseable.mode == Mode::Query + && create_stream_and_schema_from_storage(stream_name).await? + { + return Err(StreamError::Custom { + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); + } - if !static_schema_flag.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the schema of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + if update_stream_flag == "true" { + return update_stream( + req, + stream_name, + &time_partition, + &static_schema_flag, + &time_partition_limit, + &custom_partition, + ) + .await; + } - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; - update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) - .await?; - return Ok(req.headers().clone()); - } + let time_partition_in_days = if !time_partition_limit.is_empty() { + validate_time_partition_limit(&time_partition_limit)? + } else { + "" + }; - if !custom_partition.is_empty() { - validate_custom_partition(&custom_partition)?; - update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; - } else { - update_custom_partition_in_stream(stream_name.to_string(), "").await?; - } - return Ok(req.headers().clone()); - } - let mut time_partition_in_days = ""; - if !time_partition_limit.is_empty() { - time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?; - } if !custom_partition.is_empty() { validate_custom_partition(&custom_partition)?; } @@ -108,6 +119,51 @@ pub async fn create_update_stream( Ok(req.headers().clone()) } +async fn update_stream( + req: &HttpRequest, + stream_name: &str, + time_partition: &str, + static_schema_flag: &str, + time_partition_limit: &str, + custom_partition: &str, +) -> Result { + if !STREAM_INFO.stream_exists(stream_name) { + return Err(StreamError::StreamNotFound(stream_name.to_string())); + } + if !time_partition.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the time partition of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + if !static_schema_flag.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the schema of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(time_partition_limit)?; + update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days).await?; + return Ok(req.headers().clone()); + } + validate_and_update_custom_partition(stream_name, custom_partition).await?; + return Ok(req.headers().clone()); +} + +async fn validate_and_update_custom_partition( + stream_name: &str, + custom_partition: &str, +) -> Result<(), StreamError> { + if !custom_partition.is_empty() { + validate_custom_partition(custom_partition)?; + update_custom_partition_in_stream(stream_name.to_string(), custom_partition).await?; + } else { + update_custom_partition_in_stream(stream_name.to_string(), "").await?; + } + Ok(()) +} + pub fn fetch_headers_from_put_stream_request( req: &HttpRequest, ) -> (String, String, String, String, String, String) { @@ -300,7 +356,6 @@ pub async fn update_custom_partition_in_stream( } } } - let storage = CONFIG.storage().get_object_store(); if let Err(err) = storage .update_custom_partition_in_stream(&stream_name, custom_partition) @@ -378,3 +433,61 @@ pub async fn create_stream( } Ok(()) } + +/// list all streams from storage +/// if stream exists in storage, create stream and schema from storage +/// and add it to the memory map +pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result { + // Proceed to create log stream if it doesn't exist + let storage = CONFIG.storage().get_object_store(); + let streams = storage.list_streams().await?; + if streams.contains(&LogStream { + name: stream_name.to_owned(), + }) { + let mut stream_metadata = ObjectStoreFormat::default(); + let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?; + if !stream_metadata_bytes.is_empty() { + stream_metadata = serde_json::from_slice::(&stream_metadata_bytes)?; + } + + let mut schema = Arc::new(Schema::empty()); + let schema_bytes = storage.create_schema_from_ingestor(stream_name).await?; + if !schema_bytes.is_empty() { + schema = serde_json::from_slice::>(&schema_bytes)?; + } + + let mut static_schema: HashMap> = HashMap::new(); + + for (field_name, field) in schema + .fields() + .iter() + .map(|field| (field.name().to_string(), field.clone())) + { + static_schema.insert(field_name, field); + } + + let time_partition = stream_metadata.time_partition.as_deref().unwrap_or(""); + let time_partition_limit = stream_metadata + .time_partition_limit + .as_deref() + .unwrap_or(""); + let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); + let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or(""); + let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + stream_metadata.created_at, + time_partition.to_string(), + time_partition_limit.to_string(), + custom_partition.to_string(), + static_schema_flag.to_string(), + static_schema, + stream_type, + ); + } else { + return Ok(false); + } + + Ok(true) +} diff --git a/server/src/handlers/http/modal/utils/mod.rs b/server/src/handlers/http/modal/utils/mod.rs index 7ec7e1cbd..61930d43d 100644 --- a/server/src/handlers/http/modal/utils/mod.rs +++ b/server/src/handlers/http/modal/utils/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod ingest_utils; pub mod logstream_utils; pub mod rbac_utils; diff --git a/server/src/handlers/http/modal/utils/rbac_utils.rs b/server/src/handlers/http/modal/utils/rbac_utils.rs index 195a69a69..fb8d2e276 100644 --- a/server/src/handlers/http/modal/utils/rbac_utils.rs +++ b/server/src/handlers/http/modal/utils/rbac_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::{ option::CONFIG, storage::{self, ObjectStorageError, StorageMetadata}, diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 8fe9c8229..f99b170f2 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -33,6 +33,7 @@ use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use crate::metadata::STREAM_INFO; use arrow_array::RecordBatch; use crate::event::commit_schema; @@ -51,6 +52,8 @@ use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; +use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; + /// Query Request through http endpoint. #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] @@ -68,12 +71,19 @@ pub struct Query { pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); - - // get the logical plan and extract the table name - let raw_logical_plan = session_state + let raw_logical_plan = match session_state .create_logical_plan(&query_request.query) - .await?; - + .await + { + Ok(raw_logical_plan) => raw_logical_plan, + Err(_) => { + //if logical plan creation fails, create streams and try again + create_streams_for_querier().await; + session_state + .create_logical_plan(&query_request.query) + .await? + } + }; // create a visitor to extract the table name let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); @@ -178,6 +188,22 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q Ok(()) } +/// Create streams for querier if they do not exist +/// get list of streams from memory and storage +/// create streams for memory from storage if they do not exist +pub async fn create_streams_for_querier() { + let querier_streams = STREAM_INFO.list_streams(); + let store = CONFIG.storage().get_object_store(); + let storage_streams = store.list_streams().await.unwrap(); + for stream in storage_streams { + let stream_name = stream.name; + + if !querier_streams.contains(&stream_name) { + let _ = create_stream_and_schema_from_storage(&stream_name).await; + } + } +} + #[allow(clippy::too_many_arguments)] pub async fn put_results_in_cache( cache_results: Option<&str>, diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 21b5e100c..c675f4714 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -21,7 +21,6 @@ use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; use once_cell::sync::Lazy; -use relative_path::RelativePathBuf; use serde_json::Value; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -33,12 +32,8 @@ use crate::metrics::{ EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; -use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; -use crate::storage::{ - LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType, STREAM_METADATA_FILE_NAME, - STREAM_ROOT_DIRECTORY, -}; +use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -467,44 +462,7 @@ pub async fn load_stream_metadata_on_server_start( } let schema = update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?; - let mut retention = meta.retention.clone(); - let mut time_partition = meta.time_partition.clone(); - let mut time_partition_limit = meta.time_partition_limit.clone(); - let mut custom_partition = meta.custom_partition.clone(); - let mut cache_enabled = meta.cache_enabled; - let mut static_schema_flag = meta.static_schema_flag.clone(); - let mut stream_type = meta.stream_type.clone(); - if CONFIG.parseable.mode == Mode::Ingest { - storage.put_schema(stream_name, &schema).await?; - // get the base stream metadata - let bytes = storage - .get_object(&RelativePathBuf::from_iter([ - stream_name, - STREAM_ROOT_DIRECTORY, - STREAM_METADATA_FILE_NAME, - ])) - .await?; - let querier_meta: ObjectStoreFormat = serde_json::from_slice(&bytes).unwrap(); - retention.clone_from(&querier_meta.retention); - time_partition.clone_from(&querier_meta.time_partition); - time_partition_limit.clone_from(&querier_meta.time_partition_limit); - custom_partition.clone_from(&querier_meta.custom_partition); - cache_enabled.clone_from(&querier_meta.cache_enabled); - static_schema_flag.clone_from(&querier_meta.static_schema_flag); - stream_type.clone_from(&querier_meta.stream_type); - meta = ObjectStoreFormat { - retention: retention.clone(), - cache_enabled, - time_partition: time_partition.clone(), - time_partition_limit: time_partition_limit.clone(), - custom_partition: custom_partition.clone(), - static_schema_flag: static_schema_flag.clone(), - stream_type: stream_type.clone(), - ..meta.clone() - }; - storage.put_stream_manifest(stream_name, &meta).await?; - } - + storage.put_schema(stream_name, &schema).await?; //load stats from storage let stats = meta.stats; fetch_stats_from_storage(stream_name, stats).await; @@ -522,14 +480,14 @@ pub async fn load_stream_metadata_on_server_start( let metadata = LogStreamMetadata { schema, alerts, - retention, - cache_enabled, - created_at: meta.created_at.clone(), - first_event_at: meta.first_event_at.clone(), - time_partition: meta.time_partition.clone(), - time_partition_limit, - custom_partition, - static_schema_flag: meta.static_schema_flag.clone(), + retention: meta.retention, + cache_enabled: meta.cache_enabled, + created_at: meta.created_at, + first_event_at: meta.first_event_at, + time_partition: meta.time_partition, + time_partition_limit: meta.time_partition_limit, + custom_partition: meta.custom_partition, + static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, stream_type: meta.stream_type, }; diff --git a/server/src/migration.rs b/server/src/migration.rs index 2b1d8a5e5..c57eafcff 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -28,9 +28,9 @@ use crate::{ metadata::load_stream_metadata_on_server_start, option::{validation::human_size_to_bytes, Config, Mode, CONFIG}, storage::{ - object_storage::{parseable_json_path, stream_json_path}, + object_storage::{parseable_json_path, schema_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, + STREAM_ROOT_DIRECTORY, }, }; use arrow_schema::Schema; @@ -65,24 +65,44 @@ pub async fn run_metadata_migration( if let Some(storage_metadata) = storage_metadata { match get_version(&storage_metadata) { Some("v1") => { + //migrate to latest version + //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v1_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v2") => { + //migrate to latest version + //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v2_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v3") => { - let metadata = metadata_migration::v3_v4(storage_metadata); + //migrate to latest version + //remove querier endpooint and token from storage metadata + let mut metadata = metadata_migration::v3_v4(storage_metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v4") => { - let metadata = metadata_migration::v4_v5(storage_metadata); + //migrate to latest version + //remove querier endpooint and token from storage metadata + let mut metadata = metadata_migration::v4_v5(storage_metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + _ => { + //remove querier endpooint and token from storage metadata + let metadata = metadata_migration::remove_querier_metadata(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } - _ => (), } } @@ -114,7 +134,6 @@ pub async fn run_metadata_migration( pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let storage = config.storage().get_object_store(); let streams = storage.list_streams().await?; - for stream in streams { migration_stream(&stream.name, &*storage).await?; if CONFIG.parseable.hot_tier_storage_path.is_some() { @@ -156,11 +175,49 @@ async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> { async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let mut arrow_schema: Schema = Schema::empty(); - let schema_path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = storage.get_object(&schema_path).await?; + //check if schema exists for the node + //if not, create schema from querier schema from storage + //if not present with querier, create schema from ingestor schema from storage + let schema_path = schema_path(stream); + let schema = if let Ok(schema) = storage.get_object(&schema_path).await { + schema + } else { + let querier_schema = storage + .create_schema_from_querier(stream) + .await + .unwrap_or_default(); + if !querier_schema.is_empty() { + querier_schema + } else { + storage + .create_schema_from_ingestor(stream) + .await + .unwrap_or_default() + } + }; + + //check if stream.json exists for the node + //if not, create stream.json from querier stream.json from storage + //if not present with querier, create from ingestor stream.json from storage let path = stream_json_path(stream); - let stream_metadata = storage.get_object(&path).await.unwrap_or_default(); + let stream_metadata = if let Ok(stream_metadata) = storage.get_object(&path).await { + stream_metadata + } else { + let querier_stream = storage + .create_stream_from_querier(stream) + .await + .unwrap_or_default(); + if !querier_stream.is_empty() { + querier_stream + } else { + storage + .create_stream_from_ingestor(stream) + .await + .unwrap_or_default() + } + }; + let mut stream_meta_found = true; if stream_metadata.is_empty() { if CONFIG.parseable.mode != Mode::Ingest { @@ -172,7 +229,6 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: if stream_meta_found { stream_metadata_value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - let version = stream_metadata_value .as_object() .and_then(|meta| meta.get("version")) diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 3c32ff241..f6a194356 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -16,7 +16,6 @@ * */ -use base64::Engine; use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; @@ -149,7 +148,6 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } -// maybe rename pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); metadata.remove_entry("version"); @@ -174,27 +172,20 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { "server_mode".to_string(), JsonValue::String(CONFIG.parseable.mode.to_string()), ); - metadata.insert( - "querier_endpoint".to_string(), - JsonValue::String(CONFIG.parseable.address.clone()), - ); } _ => (), }, _ => (), } - metadata.insert( - "querier_auth_token".to_string(), - JsonValue::String(format!( - "Basic {}", - base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password - )) - )), - ); + storage_metadata +} +/// Remove the querier endpoint and auth token from the storage metadata +pub fn remove_querier_metadata(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + metadata.remove("querier_endpoint"); + metadata.remove("querier_auth_token"); storage_metadata } diff --git a/server/src/storage.rs b/server/src/storage.rs index 4dc108534..a018c2b1c 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -40,8 +40,7 @@ pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; pub use store_metadata::{ - get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, - StorageMetadata, + put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, }; // metadata file names in a Stream prefix diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index 6475b8fdc..c5491be6f 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -265,33 +265,31 @@ impl BlobStore { } async fn _list_streams(&self) -> Result, ObjectStorageError> { + let mut result_file_list: Vec = Vec::new(); let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; // get all dirs - - // return prefixes at the root level - let dirs: Vec<_> = common_prefixes + let streams = resp + .common_prefixes .iter() - .filter_map(|path| path.parts().next()) + .flat_map(|path| path.parts()) .map(|name| name.as_ref().to_string()) - .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) - .filter(|x| x != USERS_ROOT_DIR) - .collect(); - - let stream_json_check = FuturesUnordered::new(); + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) + .collect::>(); - for dir in &dirs { - let key = format!( - "{}/{}/{}", - dir, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME - ); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; - stream_json_check.push(task); + for stream in streams { + let stream_path = + object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); + let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; + if resp + .objects + .iter() + .any(|name| name.location.filename().unwrap().ends_with("stream.json")) + { + result_file_list.push(LogStream { name: stream }); + } } - stream_json_check.try_collect::<()>().await?; - - Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + Ok(result_file_list) } async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7e2f7f609..ff2a56953 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -426,6 +426,117 @@ pub trait ObjectStorage: Sync + 'static { .await } + ///create stream from querier stream.json from storage + async fn create_stream_from_querier( + &self, + stream_name: &str, + ) -> Result { + let stream_path = RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, + ]); + + if let Ok(querier_stream_json_bytes) = self.get_object(&stream_path).await { + let querier_stream_metadata = + serde_json::from_slice::(&querier_stream_json_bytes)?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + ..querier_stream_metadata + }; + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + self.put_object( + &stream_json_path(stream_name), + stream_metadata_bytes.clone(), + ) + .await?; + return Ok(stream_metadata_bytes); + } + + Ok(Bytes::new()) + } + + ///create stream from ingestor stream.json from storage + async fn create_stream_from_ingestor( + &self, + stream_name: &str, + ) -> Result { + let stream_path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + if let Some(stream_metadata_obs) = self + .get_objects( + Some(&stream_path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), + ) + .await + .into_iter() + .next() + { + if !stream_metadata_obs.is_empty() { + let stream_metadata_bytes = &stream_metadata_obs[0]; + let stream_ob_metadata = + serde_json::from_slice::(stream_metadata_bytes)?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + ..stream_ob_metadata + }; + + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + self.put_object( + &stream_json_path(stream_name), + stream_metadata_bytes.clone(), + ) + .await?; + + return Ok(stream_metadata_bytes); + } + } + Ok(Bytes::new()) + } + + ///create schema from querier schema from storage + async fn create_schema_from_querier( + &self, + stream_name: &str, + ) -> Result { + let path = + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); + if let Ok(querier_schema_bytes) = self.get_object(&path).await { + self.put_object(&schema_path(stream_name), querier_schema_bytes.clone()) + .await?; + return Ok(querier_schema_bytes); + } + Ok(Bytes::new()) + } + + ///create schema from ingestor schema from storage + async fn create_schema_from_ingestor( + &self, + stream_name: &str, + ) -> Result { + let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + if let Some(schema_obs) = self + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("schema") + }), + ) + .await + .into_iter() + .next() + { + let schema_ob = &schema_obs[0]; + self.put_object(&schema_path(stream_name), schema_ob.clone()) + .await?; + return Ok(schema_ob.clone()); + } + Ok(Bytes::new()) + } + async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); @@ -567,8 +678,7 @@ pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { .expect("serialize cannot fail") } -#[inline(always)] -fn schema_path(stream_name: &str) -> RelativePathBuf { +pub fn schema_path(stream_name: &str) -> RelativePathBuf { match CONFIG.parseable.mode { Mode::Ingest => { let file_name = format!( diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 0d6513437..6a546a148 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -393,33 +393,31 @@ impl S3 { } async fn _list_streams(&self) -> Result, ObjectStorageError> { + let mut result_file_list: Vec = Vec::new(); let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; // get all dirs - - // return prefixes at the root level - let dirs: Vec<_> = common_prefixes + let streams = resp + .common_prefixes .iter() - .filter_map(|path| path.parts().next()) + .flat_map(|path| path.parts()) .map(|name| name.as_ref().to_string()) - .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) - .filter(|x| x != USERS_ROOT_DIR) - .collect(); - - let stream_json_check = FuturesUnordered::new(); + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) + .collect::>(); - for dir in &dirs { - let key = format!( - "{}/{}/{}", - dir, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME - ); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; - stream_json_check.push(task); + for stream in streams { + let stream_path = + object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); + let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; + if resp + .objects + .iter() + .any(|name| name.location.filename().unwrap().ends_with("stream.json")) + { + result_file_list.push(LogStream { name: stream }); + } } - stream_json_check.try_collect::<()>().await?; - - Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + Ok(result_file_list) } async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 686d7e0ce..54735ab71 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -22,7 +22,6 @@ use std::{ path::PathBuf, }; -use base64::Engine; use bytes::Bytes; use once_cell::sync::OnceCell; use relative_path::RelativePathBuf; @@ -64,29 +63,10 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, - pub querier_endpoint: Option, - pub querier_auth_token: Option, } impl StorageMetadata { pub fn new() -> Self { - let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode { - Mode::All | Mode::Query => { - let querier_auth_token = format!( - "Basic {}", - base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password - )) - ); - ( - Some(CONFIG.parseable.address.clone()), - Some(querier_auth_token), - ) - } - Mode::Ingest => (None, None), - }; - Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), @@ -98,8 +78,6 @@ impl StorageMetadata { streams: Vec::new(), roles: HashMap::default(), default_role: None, - querier_endpoint, - querier_auth_token, } } pub fn global() -> &'static StaticStorageMetadata {