From 4a9683187456147aa8879a0e6b4cb2d1fdb11bfe Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 25 Nov 2024 00:24:01 +0530 Subject: [PATCH 1/8] fix: ingestor-querier sync Below changes are done in this PR - 1. removed querier endpoint and token from parseable.json 2. added migration steps to update parseable.json for latest release 3. removed ingestor to querier sync for ingestion with new stream creation 4. updated logic to list stream from storage 5. updated logic in stream migration at server start 6. updated logic in places where querier/ingestors need to check if stream is created in S3 before confirming the existence of the stream --- server/src/handlers/http/cluster/mod.rs | 71 +--------- server/src/handlers/http/ingest.rs | 68 ++++------ server/src/handlers/http/logstream.rs | 61 ++++++--- .../http/modal/ingest/ingester_logstream.rs | 10 +- .../http/modal/query/querier_logstream.rs | 22 +-- .../http/modal/utils/logstream_utils.rs | 126 +++++++++++++++++- server/src/handlers/http/query.rs | 32 ++++- server/src/metadata.rs | 60 ++------- server/src/migration.rs | 88 ++++++++++-- server/src/migration/metadata_migration.rs | 22 +-- server/src/storage.rs | 5 +- server/src/storage/object_storage.rs | 3 +- server/src/storage/s3.rs | 43 +++--- server/src/storage/store_metadata.rs | 22 --- 14 files changed, 351 insertions(+), 282 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index e438ab8f4..da5908dbc 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -30,7 +30,6 @@ use crate::metrics::prom_utils::Metrics; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; use crate::stats::Stats; -use crate::storage::get_staging_metadata; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; @@ -65,7 +64,6 @@ pub async fn sync_streams_with_ingestors( headers: HeaderMap, body: Bytes, stream_name: &str, - skip_ingestor: Option, ) -> Result<(), StreamError> { let mut reqwest_headers = http_header::HeaderMap::new(); @@ -79,15 +77,7 @@ pub async fn sync_streams_with_ingestors( let client = reqwest::Client::new(); - let final_ingestor_infos = match skip_ingestor { - None => ingestor_infos, - Some(skip_ingestor) => ingestor_infos - .into_iter() - .filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone())) - .collect::>(), - }; - - for ingestor in final_ingestor_infos { + for ingestor in ingestor_infos { if !utils::check_liveness(&ingestor.domain_name).await { log::warn!("Ingestor {} is not live", ingestor.domain_name); continue; @@ -852,62 +842,3 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { Ok(()) } - -pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> { - let client = reqwest::Client::new(); - - let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { - StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) - })?; - let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); - let token = staging_metadata.querier_auth_token.unwrap(); - - if !check_liveness(&querier_endpoint).await { - log::warn!("Querier {} is not live", querier_endpoint); - return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live"))); - } - - let url = format!( - "{}{}/logstream/{}?skip_ingestors={}", - querier_endpoint, - base_path_without_preceding_slash(), - stream_name, - CONFIG.parseable.ingestor_endpoint, - ); - - let response = client - .put(&url) - .header(header::AUTHORIZATION, &token) - .send() - .await - .map_err(|err| { - log::error!( - "Fatal: failed to forward create stream request to querier: {}\n Error: {:?}", - &url, - err - ); - StreamError::Network(err) - })?; - - let status = response.status(); - - if !status.is_success() { - let response_text = response.text().await.map_err(|err| { - log::error!("Failed to read response text from querier: {}", &url); - StreamError::Network(err) - })?; - - log::error!( - "Failed to forward create stream request to querier: {}\nResponse Returned: {:?}", - &url, - response_text - ); - - return Err(StreamError::Anyhow(anyhow::anyhow!( - "Request failed with status: {}", - status, - ))); - } - - Ok(()) -} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 48ec1d9dc..363f362f2 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -26,11 +26,11 @@ use crate::event::{ error::EventError, format::{self, EventFormat}, }; -use crate::handlers::http::cluster::forward_create_stream_request; +use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; -use crate::metadata::{self, STREAM_INFO}; +use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; @@ -153,7 +153,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { - super::logstream::create_stream( - stream_name.to_string(), - "", - "", - "", - "", - Arc::new(Schema::empty()), - stream_type, - ) - .await?; - } - Mode::Ingest => { - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let store = CONFIG.storage().get_object_store(); - let streams = store.list_streams().await?; - if !streams.contains(&LogStream { - name: stream_name.to_owned(), - }) { - match forward_create_stream_request(stream_name).await { - Ok(()) => log::info!("Stream {} created", stream_name), - Err(e) => { - return Err(PostError::Invalid(anyhow::anyhow!( - "Unable to create stream: {} using query server. Error: {}", - stream_name, - e.to_string(), - ))) - } - }; - } - metadata::STREAM_INFO - .upsert_stream_info( - &*store, - LogStream { - name: stream_name.to_owned(), - }, - ) - .await - .map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?; + + if CONFIG.parseable.mode != Mode::All { + let store = CONFIG.storage().get_object_store(); + let streams = store.list_streams().await?; + if streams.contains(&LogStream { + name: stream_name.to_owned(), + }) { + create_stream_and_schema_from_storage(stream_name).await?; + return Ok(stream_exists); } } + + super::logstream::create_stream( + stream_name.to_string(), + "", + "", + "", + "", + Arc::new(Schema::empty()), + stream_type, + ) + .await?; + Ok(stream_exists) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 00ddff3f3..19ce86455 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -20,17 +20,19 @@ use self::error::{CreateStreamError, StreamError}; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; -use super::modal::utils::logstream_utils::create_update_stream; +use super::modal::utils::logstream_utils::{ + create_stream_and_schema_from_storage, create_update_stream, +}; use crate::alerts::Alerts; use crate::event::format::update_data_type_to_datetime; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::CONFIG; +use crate::option::{Mode, CONFIG}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::StreamType; -use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; +use crate::storage::{retention::Retention, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; @@ -82,11 +84,12 @@ pub async fn delete(req: HttpRequest) -> Result { } pub async fn list(_: HttpRequest) -> impl Responder { - let res: Vec = STREAM_INFO + let res = CONFIG + .storage() + .get_object_store() .list_streams() - .into_iter() - .map(|stream| LogStream { name: stream }) - .collect(); + .await + .unwrap(); web::Json(res) } @@ -180,7 +183,11 @@ pub async fn put_alert( validator::alert(&alerts)?; if !STREAM_INFO.stream_initialized(&stream_name)? { - return Err(StreamError::UninitializedLogstream); + if CONFIG.parseable.mode == Mode::Query { + create_stream_and_schema_from_storage(&stream_name).await?; + } else { + return Err(StreamError::UninitializedLogstream); + } } let schema = STREAM_INFO.schema(&stream_name)?; @@ -218,7 +225,11 @@ pub async fn put_alert( pub async fn get_retention(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name.to_string())); + if CONFIG.parseable.mode == Mode::Query { + create_stream_and_schema_from_storage(&stream_name).await?; + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let retention = STREAM_INFO.get_retention(&stream_name); @@ -328,7 +339,11 @@ pub async fn get_stats(req: HttpRequest) -> Result let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if CONFIG.parseable.mode == Mode::Query { + create_stream_and_schema_from_storage(&stream_name).await?; + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let query_string = req.query_string(); @@ -497,7 +512,11 @@ pub async fn create_stream( pub async fn get_stream_info(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if CONFIG.parseable.mode == Mode::Query { + create_stream_and_schema_from_storage(&stream_name).await?; + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let store = CONFIG.storage().get_object_store(); @@ -540,7 +559,11 @@ pub async fn put_stream_hot_tier( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + if CONFIG.parseable.mode == Mode::Query { + create_stream_and_schema_from_storage(&stream_name).await?; + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { @@ -590,7 +613,11 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result Result<(), StreamError> { header::CONTENT_TYPE, HeaderValue::from_static("application/json"), ); - sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?; + sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; } Ok(()) } diff --git a/server/src/handlers/http/modal/ingest/ingester_logstream.rs b/server/src/handlers/http/modal/ingest/ingester_logstream.rs index f5ece7487..234d47144 100644 --- a/server/src/handlers/http/modal/ingest/ingester_logstream.rs +++ b/server/src/handlers/http/modal/ingest/ingester_logstream.rs @@ -7,7 +7,10 @@ use crate::{ catalog::remove_manifest_from_snapshot, event, handlers::http::{ - logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream, + logstream::error::StreamError, + modal::utils::logstream_utils::{ + create_stream_and_schema_from_storage, create_update_stream, + }, }, metadata::{self, STREAM_INFO}, option::CONFIG, @@ -22,8 +25,7 @@ pub async fn retention_cleanup( let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); + create_stream_and_schema_from_storage(&stream_name).await?; } let date_list: Vec = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; @@ -40,7 +42,7 @@ pub async fn retention_cleanup( pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + create_stream_and_schema_from_storage(&stream_name).await?; } metadata::STREAM_INFO.delete_stream(&stream_name); diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index c67d173ae..13bdaede2 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -5,7 +5,6 @@ use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use serde::Deserialize; use tokio::sync::Mutex; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); @@ -20,7 +19,9 @@ use crate::{ utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::{error::StreamError, get_stats_date}, - modal::utils::logstream_utils::create_update_stream, + modal::utils::logstream_utils::{ + create_stream_and_schema_from_storage, create_update_stream, + }, }, hottier::HotTierManager, metadata::{self, STREAM_INFO}, @@ -32,7 +33,7 @@ use crate::{ pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + create_stream_and_schema_from_storage(&stream_name).await?; } let objectstore = CONFIG.storage().get_object_store(); @@ -79,22 +80,13 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -#[derive(Deserialize)] -pub struct PutStreamQuery { - skip_ingestors: Option, -} - -pub async fn put_stream( - req: HttpRequest, - body: Bytes, - info: web::Query, -) -> Result { +pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let _ = CREATE_STREAM_LOCK.lock().await; let headers = create_update_stream(&req, &body, &stream_name).await?; - sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?; + sync_streams_with_ingestors(headers, body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } @@ -103,7 +95,7 @@ pub async fn get_stats(req: HttpRequest) -> Result let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); + create_stream_and_schema_from_storage(&stream_name).await?; } let query_string = req.query_string(); diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index 65628eca5..6b023be9e 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -4,17 +4,23 @@ use actix_web::{http::header::HeaderMap, HttpRequest}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use http::StatusCode; +use relative_path::RelativePathBuf; use crate::{ + catalog::snapshot::Snapshot, handlers::{ http::logstream::error::{CreateStreamError, StreamError}, CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, metadata::{self, STREAM_INFO}, - option::CONFIG, + option::{Mode, CONFIG}, static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, - storage::StreamType, + stats::FullStats, + storage::{ + object_storage::{schema_path, stream_json_path}, + LogStream, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY, + }, validator, }; @@ -41,6 +47,16 @@ pub async fn create_update_stream( }); } + if CONFIG.parseable.mode == Mode::Query { + create_stream_and_schema_from_storage(stream_name).await?; + return Err(StreamError::Custom { + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); + } + if update_stream == "true" { if !STREAM_INFO.stream_exists(stream_name) { return Err(StreamError::StreamNotFound(stream_name.to_string())); @@ -378,3 +394,109 @@ pub async fn create_stream( } Ok(()) } + +pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<(), StreamError> { + // Proceed to create log stream if it doesn't exist + let storage = CONFIG.storage().get_object_store(); + let streams = storage.list_streams().await?; + if streams.contains(&LogStream { + name: stream_name.to_owned(), + }) { + let mut stream_metadata = ObjectStoreFormat::default(); + let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let stream_obs = storage + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), + ) + .await + .into_iter() + .next(); + if let Some(stream_obs) = stream_obs { + let stream_ob = &stream_obs[0]; + let stream_ob_metdata = serde_json::from_slice::(stream_ob)?; + stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + ..stream_ob_metdata + }; + + let stream_metadata_bytes = serde_json::to_vec(&stream_metadata)?.into(); + storage + .put_object(&stream_json_path(stream_name), stream_metadata_bytes) + .await?; + } + + let mut schema = Arc::new(Schema::empty()); + let schema_obs = storage + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("schema") + }), + ) + .await + .into_iter() + .next(); + if let Some(schema_obs) = schema_obs { + let schema_ob = &schema_obs[0]; + storage + .put_object(&schema_path(stream_name), schema_ob.clone()) + .await?; + schema = serde_json::from_slice::>(schema_ob)?; + } + + let mut static_schema: HashMap> = HashMap::new(); + + for (field_name, field) in schema + .fields() + .iter() + .map(|field| (field.name().to_string(), field.clone())) + { + static_schema.insert(field_name, field); + } + + let time_partition = if stream_metadata.time_partition.is_some() { + stream_metadata.time_partition.as_ref().unwrap() + } else { + "" + }; + let time_partition_limit = if stream_metadata.time_partition_limit.is_some() { + stream_metadata.time_partition_limit.as_ref().unwrap() + } else { + "" + }; + let custom_partition = if stream_metadata.custom_partition.is_some() { + stream_metadata.custom_partition.as_ref().unwrap() + } else { + "" + }; + let static_schema_flag = if stream_metadata.static_schema_flag.is_some() { + stream_metadata.static_schema_flag.as_ref().unwrap() + } else { + "" + }; + let stream_type = if stream_metadata.stream_type.is_some() { + stream_metadata.stream_type.as_ref().unwrap() + } else { + "" + }; + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + stream_metadata.created_at, + time_partition.to_string(), + time_partition_limit.to_string(), + custom_partition.to_string(), + static_schema_flag.to_string(), + static_schema, + stream_type, + ); + } else { + return Err(StreamError::StreamNotFound(stream_name.to_string())); + } + + Ok(()) +} diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 8fe9c8229..5115c1a8c 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -33,6 +33,7 @@ use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use crate::metadata::STREAM_INFO; use arrow_array::RecordBatch; use crate::event::commit_schema; @@ -51,6 +52,8 @@ use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; +use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; + /// Query Request through http endpoint. #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] @@ -68,12 +71,18 @@ pub struct Query { pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); - - // get the logical plan and extract the table name - let raw_logical_plan = session_state + let raw_logical_plan = match session_state .create_logical_plan(&query_request.query) - .await?; - + .await + { + Ok(raw_logical_plan) => raw_logical_plan, + Err(_) => { + create_streams_for_querier().await; + session_state + .create_logical_plan(&query_request.query) + .await? + } + }; // create a visitor to extract the table name let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); @@ -178,6 +187,19 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q Ok(()) } +pub async fn create_streams_for_querier() { + let querier_streams = STREAM_INFO.list_streams(); + let store = CONFIG.storage().get_object_store(); + let storage_streams = store.list_streams().await.unwrap(); + for stream in storage_streams { + let stream_name = stream.name; + + if !querier_streams.contains(&stream_name) { + let _ = create_stream_and_schema_from_storage(&stream_name).await; + } + } +} + #[allow(clippy::too_many_arguments)] pub async fn put_results_in_cache( cache_results: Option<&str>, diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 21b5e100c..b582418ba 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -21,7 +21,6 @@ use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; use once_cell::sync::Lazy; -use relative_path::RelativePathBuf; use serde_json::Value; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -33,12 +32,8 @@ use crate::metrics::{ EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; -use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; -use crate::storage::{ - LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType, STREAM_METADATA_FILE_NAME, - STREAM_ROOT_DIRECTORY, -}; +use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -467,43 +462,6 @@ pub async fn load_stream_metadata_on_server_start( } let schema = update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?; - let mut retention = meta.retention.clone(); - let mut time_partition = meta.time_partition.clone(); - let mut time_partition_limit = meta.time_partition_limit.clone(); - let mut custom_partition = meta.custom_partition.clone(); - let mut cache_enabled = meta.cache_enabled; - let mut static_schema_flag = meta.static_schema_flag.clone(); - let mut stream_type = meta.stream_type.clone(); - if CONFIG.parseable.mode == Mode::Ingest { - storage.put_schema(stream_name, &schema).await?; - // get the base stream metadata - let bytes = storage - .get_object(&RelativePathBuf::from_iter([ - stream_name, - STREAM_ROOT_DIRECTORY, - STREAM_METADATA_FILE_NAME, - ])) - .await?; - let querier_meta: ObjectStoreFormat = serde_json::from_slice(&bytes).unwrap(); - retention.clone_from(&querier_meta.retention); - time_partition.clone_from(&querier_meta.time_partition); - time_partition_limit.clone_from(&querier_meta.time_partition_limit); - custom_partition.clone_from(&querier_meta.custom_partition); - cache_enabled.clone_from(&querier_meta.cache_enabled); - static_schema_flag.clone_from(&querier_meta.static_schema_flag); - stream_type.clone_from(&querier_meta.stream_type); - meta = ObjectStoreFormat { - retention: retention.clone(), - cache_enabled, - time_partition: time_partition.clone(), - time_partition_limit: time_partition_limit.clone(), - custom_partition: custom_partition.clone(), - static_schema_flag: static_schema_flag.clone(), - stream_type: stream_type.clone(), - ..meta.clone() - }; - storage.put_stream_manifest(stream_name, &meta).await?; - } //load stats from storage let stats = meta.stats; @@ -522,14 +480,14 @@ pub async fn load_stream_metadata_on_server_start( let metadata = LogStreamMetadata { schema, alerts, - retention, - cache_enabled, - created_at: meta.created_at.clone(), - first_event_at: meta.first_event_at.clone(), - time_partition: meta.time_partition.clone(), - time_partition_limit, - custom_partition, - static_schema_flag: meta.static_schema_flag.clone(), + retention: meta.retention, + cache_enabled: meta.cache_enabled, + created_at: meta.created_at, + first_event_at: meta.first_event_at, + time_partition: meta.time_partition, + time_partition_limit: meta.time_partition_limit, + custom_partition: meta.custom_partition, + static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, stream_type: meta.stream_type, }; diff --git a/server/src/migration.rs b/server/src/migration.rs index 2b1d8a5e5..804e77fa0 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -24,13 +24,15 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ + catalog::snapshot::Snapshot, hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION}, metadata::load_stream_metadata_on_server_start, option::{validation::human_size_to_bytes, Config, Mode, CONFIG}, + stats::FullStats, storage::{ - object_storage::{parseable_json_path, stream_json_path}, - ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, + object_storage::{parseable_json_path, schema_path, stream_json_path}, + ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; use arrow_schema::Schema; @@ -82,7 +84,10 @@ pub async fn run_metadata_migration( let metadata = metadata_migration::v4_v5(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } - _ => (), + _ => { + let metadata = metadata_migration::remove_querier_metadata(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } } } @@ -114,7 +119,6 @@ pub async fn run_metadata_migration( pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let storage = config.storage().get_object_store(); let streams = storage.list_streams().await?; - for stream in streams { migration_stream(&stream.name, &*storage).await?; if CONFIG.parseable.hot_tier_storage_path.is_some() { @@ -156,11 +160,74 @@ async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> { async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let mut arrow_schema: Schema = Schema::empty(); - let schema_path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = storage.get_object(&schema_path).await?; + let query_schema_path = + RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); + let schema = if let Ok(schema) = storage.get_object(&query_schema_path).await { + schema + } else { + let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); + let schema_obs = storage + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("schema") + }), + ) + .await + .into_iter() + .next(); + if let Some(schema_obs) = schema_obs { + let schema_ob = &schema_obs[0]; + storage + .put_object(&schema_path(stream), schema_ob.clone()) + .await?; + schema_ob.clone() + } else { + Bytes::new() + } + }; let path = stream_json_path(stream); - let stream_metadata = storage.get_object(&path).await.unwrap_or_default(); + let stream_metadata = if let Ok(stream_metadata) = storage.get_object(&path).await { + stream_metadata + } else if CONFIG.parseable.mode != Mode::All { + let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); + let stream_metadata_obs = storage + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), + ) + .await + .into_iter() + .next(); + if let Some(stream_metadata_obs) = stream_metadata_obs { + if !stream_metadata_obs.is_empty() { + let stream_metadata_bytes = &stream_metadata_obs[0]; + let stream_ob_metdata = + serde_json::from_slice::(stream_metadata_bytes)?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + ..stream_ob_metdata + }; + + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + storage + .put_object(&stream_json_path(stream), stream_metadata_bytes.clone()) + .await?; + + stream_metadata_bytes + } else { + Bytes::new() + } + } else { + Bytes::new() + } + } else { + Bytes::new() + }; let mut stream_meta_found = true; if stream_metadata.is_empty() { if CONFIG.parseable.mode != Mode::Ingest { @@ -172,7 +239,6 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: if stream_meta_found { stream_metadata_value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - let version = stream_metadata_value .as_object() .and_then(|meta| meta.get("version")) @@ -189,7 +255,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let schema = serde_json::from_slice(&schema).ok(); arrow_schema = schema_migration::v1_v4(schema)?; storage - .put_object(&schema_path, to_bytes(&arrow_schema)) + .put_object(&query_schema_path, to_bytes(&arrow_schema)) .await?; } Some("v2") => { @@ -203,7 +269,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let schema = serde_json::from_slice(&schema)?; arrow_schema = schema_migration::v2_v4(schema)?; storage - .put_object(&schema_path, to_bytes(&arrow_schema)) + .put_object(&query_schema_path, to_bytes(&arrow_schema)) .await?; } Some("v3") => { diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 3c32ff241..0eb9363a8 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -16,7 +16,6 @@ * */ -use base64::Engine; use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; @@ -149,7 +148,6 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } -// maybe rename pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); metadata.remove_entry("version"); @@ -174,27 +172,19 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { "server_mode".to_string(), JsonValue::String(CONFIG.parseable.mode.to_string()), ); - metadata.insert( - "querier_endpoint".to_string(), - JsonValue::String(CONFIG.parseable.address.clone()), - ); } _ => (), }, _ => (), } - metadata.insert( - "querier_auth_token".to_string(), - JsonValue::String(format!( - "Basic {}", - base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password - )) - )), - ); + storage_metadata +} +pub fn remove_querier_metadata(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + metadata.remove("querier_endpoint"); + metadata.remove("querier_auth_token"); storage_metadata } diff --git a/server/src/storage.rs b/server/src/storage.rs index 4dc108534..b06dd368f 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -40,8 +40,7 @@ pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; pub use store_metadata::{ - get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, - StorageMetadata, + put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, }; // metadata file names in a Stream prefix @@ -203,7 +202,7 @@ impl ObjectStoreFormat { } } -#[derive(serde::Serialize, PartialEq)] +#[derive(serde::Serialize, PartialEq, Debug)] pub struct LogStream { pub name: String, } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7e2f7f609..2359efd6f 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -567,8 +567,7 @@ pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { .expect("serialize cannot fail") } -#[inline(always)] -fn schema_path(stream_name: &str) -> RelativePathBuf { +pub fn schema_path(stream_name: &str) -> RelativePathBuf { match CONFIG.parseable.mode { Mode::Ingest => { let file_name = format!( diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 0d6513437..400f305b8 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -393,33 +393,32 @@ impl S3 { } async fn _list_streams(&self) -> Result, ObjectStorageError> { + let mut result_file_list: Vec = Vec::new(); let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; // get all dirs - - // return prefixes at the root level - let dirs: Vec<_> = common_prefixes + let streams = resp + .common_prefixes .iter() - .filter_map(|path| path.parts().next()) + .flat_map(|path| path.parts()) .map(|name| name.as_ref().to_string()) - .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) - .filter(|x| x != USERS_ROOT_DIR) - .collect(); - - let stream_json_check = FuturesUnordered::new(); - - for dir in &dirs { - let key = format!( - "{}/{}/{}", - dir, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME - ); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; - stream_json_check.push(task); + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY) + .filter(|name| name != USERS_ROOT_DIR) + .collect::>(); + for stream in streams { + let stream_path = + object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); + let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; + let stream_files: Vec = resp + .objects + .iter() + .filter(|name| name.location.filename().unwrap().ends_with("stream.json")) + .map(|name| name.location.to_string()) + .collect(); + if !stream_files.is_empty() { + result_file_list.push(LogStream { name: stream }); + } } - - stream_json_check.try_collect::<()>().await?; - - Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + Ok(result_file_list) } async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 686d7e0ce..54735ab71 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -22,7 +22,6 @@ use std::{ path::PathBuf, }; -use base64::Engine; use bytes::Bytes; use once_cell::sync::OnceCell; use relative_path::RelativePathBuf; @@ -64,29 +63,10 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, - pub querier_endpoint: Option, - pub querier_auth_token: Option, } impl StorageMetadata { pub fn new() -> Self { - let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode { - Mode::All | Mode::Query => { - let querier_auth_token = format!( - "Basic {}", - base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password - )) - ); - ( - Some(CONFIG.parseable.address.clone()), - Some(querier_auth_token), - ) - } - Mode::Ingest => (None, None), - }; - Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), @@ -98,8 +78,6 @@ impl StorageMetadata { streams: Vec::new(), roles: HashMap::default(), default_role: None, - querier_endpoint, - querier_auth_token, } } pub fn global() -> &'static StaticStorageMetadata { From 4e6a676529c88b0052b4cd95eb09a57ad3b88604 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 25 Nov 2024 12:20:23 +0000 Subject: [PATCH 2/8] updated for distributed fresh deployments --- server/src/handlers/http/ingest.rs | 12 ++- server/src/handlers/http/logstream.rs | 50 ++++++++-- .../http/modal/ingest/ingester_logstream.rs | 19 ++-- .../http/modal/query/querier_logstream.rs | 14 ++- .../http/modal/utils/logstream_utils.rs | 23 ++--- server/src/metadata.rs | 5 +- server/src/migration.rs | 75 ++++----------- server/src/storage/azure_blob.rs | 43 +++++---- server/src/storage/object_storage.rs | 91 +++++++++++++++++++ 9 files changed, 221 insertions(+), 111 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 363f362f2..fbb5a66f0 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -153,7 +153,17 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result Result let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - if CONFIG.parseable.mode == Mode::Query { - create_stream_and_schema_from_storage(&stream_name).await?; + if CONFIG.parseable.mode != Mode::All { + if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { + if !stream_found { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + } else { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } } else { return Err(StreamError::StreamNotFound(stream_name)); } @@ -513,7 +531,11 @@ pub async fn get_stream_info(req: HttpRequest) -> Result Result Result = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; - let mut first_event_at: Option = None; - if let Err(err) = res { - log::error!("Failed to update manifest list in the snapshot {err:?}") - } else { - first_event_at = res.unwrap(); - } + let first_event_at: Option = res.unwrap_or_default(); Ok((first_event_at, StatusCode::OK)) } @@ -42,7 +41,11 @@ pub async fn retention_cleanup( pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - create_stream_and_schema_from_storage(&stream_name).await?; + if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { + if !stream_found { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + } } metadata::STREAM_INFO.delete_stream(&stream_name); diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index 13bdaede2..1172c182d 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -33,7 +33,11 @@ use crate::{ pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - create_stream_and_schema_from_storage(&stream_name).await?; + if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { + if !stream_found { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + } } let objectstore = CONFIG.storage().get_object_store(); @@ -95,7 +99,13 @@ pub async fn get_stats(req: HttpRequest) -> Result let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { - create_stream_and_schema_from_storage(&stream_name).await?; + if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { + if !stream_found { + return Err(StreamError::StreamNotFound(stream_name)); + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } } let query_string = req.query_string(); diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index 6b023be9e..ae2cdf180 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -47,14 +47,16 @@ pub async fn create_update_stream( }); } - if CONFIG.parseable.mode == Mode::Query { - create_stream_and_schema_from_storage(stream_name).await?; + if !metadata::STREAM_INFO.stream_exists(stream_name) + && CONFIG.parseable.mode == Mode::Query + && create_stream_and_schema_from_storage(stream_name).await? + { return Err(StreamError::Custom { - msg: format!( - "Logstream {stream_name} already exists, please create a new log stream with unique name" - ), - status: StatusCode::BAD_REQUEST, - }); + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); } if update_stream == "true" { @@ -316,7 +318,6 @@ pub async fn update_custom_partition_in_stream( } } } - let storage = CONFIG.storage().get_object_store(); if let Err(err) = storage .update_custom_partition_in_stream(&stream_name, custom_partition) @@ -395,7 +396,7 @@ pub async fn create_stream( Ok(()) } -pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<(), StreamError> { +pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result { // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); let streams = storage.list_streams().await?; @@ -495,8 +496,8 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< stream_type, ); } else { - return Err(StreamError::StreamNotFound(stream_name.to_string())); + return Ok(false); } - Ok(()) + Ok(true) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index b582418ba..0c1fd97f0 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -32,6 +32,7 @@ use crate::metrics::{ EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; +use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType}; use crate::utils::arrow::MergedRecordReader; @@ -462,7 +463,9 @@ pub async fn load_stream_metadata_on_server_start( } let schema = update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?; - + if CONFIG.parseable.mode == Mode::Ingest { + storage.put_schema(stream_name, &schema).await?; + } //load stats from storage let stats = meta.stats; fetch_stats_from_storage(stream_name, stats).await; diff --git a/server/src/migration.rs b/server/src/migration.rs index 804e77fa0..bcaf892e7 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -24,15 +24,13 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ - catalog::snapshot::Snapshot, hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION}, metadata::load_stream_metadata_on_server_start, option::{validation::human_size_to_bytes, Config, Mode, CONFIG}, - stats::FullStats, storage::{ - object_storage::{parseable_json_path, schema_path, stream_json_path}, - ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, - PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, + object_storage::{parseable_json_path, stream_json_path}, + ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; use arrow_schema::Schema; @@ -165,69 +163,30 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let schema = if let Ok(schema) = storage.get_object(&query_schema_path).await { schema } else { - let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); - let schema_obs = storage - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("schema") - }), - ) + storage + .create_schema_from_ingestor(stream) .await - .into_iter() - .next(); - if let Some(schema_obs) = schema_obs { - let schema_ob = &schema_obs[0]; - storage - .put_object(&schema_path(stream), schema_ob.clone()) - .await?; - schema_ob.clone() - } else { - Bytes::new() - } + .unwrap_or_default() }; let path = stream_json_path(stream); let stream_metadata = if let Ok(stream_metadata) = storage.get_object(&path).await { stream_metadata - } else if CONFIG.parseable.mode != Mode::All { - let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); - let stream_metadata_obs = storage - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") - }), - ) + } else { + let querier_stream = storage + .create_stream_from_querier(stream) .await - .into_iter() - .next(); - if let Some(stream_metadata_obs) = stream_metadata_obs { - if !stream_metadata_obs.is_empty() { - let stream_metadata_bytes = &stream_metadata_obs[0]; - let stream_ob_metdata = - serde_json::from_slice::(stream_metadata_bytes)?; - let stream_metadata = ObjectStoreFormat { - stats: FullStats::default(), - snapshot: Snapshot::default(), - ..stream_ob_metdata - }; - - let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); - storage - .put_object(&stream_json_path(stream), stream_metadata_bytes.clone()) - .await?; - - stream_metadata_bytes - } else { - Bytes::new() - } + .unwrap_or_default(); + if !querier_stream.is_empty() { + querier_stream } else { - Bytes::new() + storage + .create_stream_from_ingestor(stream) + .await + .unwrap_or_default() } - } else { - Bytes::new() }; + let mut stream_meta_found = true; if stream_metadata.is_empty() { if CONFIG.parseable.mode != Mode::Ingest { diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index 6475b8fdc..aa79ac053 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -265,33 +265,32 @@ impl BlobStore { } async fn _list_streams(&self) -> Result, ObjectStorageError> { + let mut result_file_list: Vec = Vec::new(); let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; // get all dirs - - // return prefixes at the root level - let dirs: Vec<_> = common_prefixes + let streams = resp + .common_prefixes .iter() - .filter_map(|path| path.parts().next()) + .flat_map(|path| path.parts()) .map(|name| name.as_ref().to_string()) - .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) - .filter(|x| x != USERS_ROOT_DIR) - .collect(); - - let stream_json_check = FuturesUnordered::new(); - - for dir in &dirs { - let key = format!( - "{}/{}/{}", - dir, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME - ); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; - stream_json_check.push(task); + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY) + .filter(|name| name != USERS_ROOT_DIR) + .collect::>(); + for stream in streams { + let stream_path = + object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); + let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; + let stream_files: Vec = resp + .objects + .iter() + .filter(|name| name.location.filename().unwrap().ends_with("stream.json")) + .map(|name| name.location.to_string()) + .collect(); + if !stream_files.is_empty() { + result_file_list.push(LogStream { name: stream }); + } } - - stream_json_check.try_collect::<()>().await?; - - Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + Ok(result_file_list) } async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 2359efd6f..f12c88dd5 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -426,6 +426,97 @@ pub trait ObjectStorage: Sync + 'static { .await } + async fn create_stream_from_querier( + &self, + stream_name: &str, + ) -> Result { + let stream_path = RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, + ]); + if let Ok(querier_stream_json_bytes) = self.get_object(&stream_path).await { + let querier_stream_metadata = + serde_json::from_slice::(&querier_stream_json_bytes)?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + ..querier_stream_metadata + }; + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + self.put_object( + &stream_json_path(stream_name), + stream_metadata_bytes.clone(), + ) + .await?; + return Ok(stream_metadata_bytes); + } + Ok(Bytes::new()) + } + + async fn create_stream_from_ingestor( + &self, + stream_name: &str, + ) -> Result { + let stream_path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let stream_metadata_obs = self + .get_objects( + Some(&stream_path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), + ) + .await + .into_iter() + .next(); + if let Some(stream_metadata_obs) = stream_metadata_obs { + if !stream_metadata_obs.is_empty() { + let stream_metadata_bytes = &stream_metadata_obs[0]; + let stream_ob_metdata = + serde_json::from_slice::(stream_metadata_bytes)?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + ..stream_ob_metdata + }; + + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + self.put_object( + &stream_json_path(stream_name), + stream_metadata_bytes.clone(), + ) + .await?; + + return Ok(stream_metadata_bytes); + } + } + Ok(Bytes::new()) + } + + async fn create_schema_from_ingestor( + &self, + stream_name: &str, + ) -> Result { + let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let schema_obs = self + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("schema") + }), + ) + .await + .into_iter() + .next(); + if let Some(schema_obs) = schema_obs { + let schema_ob = &schema_obs[0]; + self.put_object(&schema_path(stream_name), schema_ob.clone()) + .await?; + return Ok(schema_ob.clone()); + } + Ok(Bytes::new()) + } + async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); From 8c24d0d82329f18bf48e2d37042aa30a41139e31 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 25 Nov 2024 15:22:16 +0000 Subject: [PATCH 3/8] migration update, remove querier endpoint and token from parseable.json --- server/src/migration.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index bcaf892e7..d51a0a2ac 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -67,19 +67,27 @@ pub async fn run_metadata_migration( Some("v1") => { let mut metadata = metadata_migration::v1_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v2") => { let mut metadata = metadata_migration::v2_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v3") => { - let metadata = metadata_migration::v3_v4(storage_metadata); + let mut metadata = metadata_migration::v3_v4(storage_metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v4") => { - let metadata = metadata_migration::v4_v5(storage_metadata); + let mut metadata = metadata_migration::v4_v5(storage_metadata); + metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } _ => { From c66f09da8bc61bc0a8fa8eeb80f087a2e031121a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 25 Nov 2024 18:54:31 +0000 Subject: [PATCH 4/8] bug fix for schema sync at server restart --- server/src/handlers/http/logstream.rs | 14 +++++++++++++- server/src/metadata.rs | 5 +---- server/src/migration.rs | 27 +++++++++++++++++---------- server/src/storage/object_storage.rs | 14 ++++++++++++++ 4 files changed, 45 insertions(+), 15 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index fe88e56b5..4a13f0ffd 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -116,7 +116,19 @@ pub async fn detect_schema(body: Bytes) -> Result { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let schema = STREAM_INFO.schema(&stream_name)?; + let schema = if let Ok(schema) = STREAM_INFO.schema(&stream_name) { + schema + } else if CONFIG.parseable.mode == Mode::Query { + let stream_found = create_stream_and_schema_from_storage(&stream_name).await?; + if !stream_found { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } else { + STREAM_INFO.schema(&stream_name)? + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + }; + Ok((web::Json(schema), StatusCode::OK)) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 0c1fd97f0..c675f4714 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -32,7 +32,6 @@ use crate::metrics::{ EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; -use crate::option::{Mode, CONFIG}; use crate::storage::retention::Retention; use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType}; use crate::utils::arrow::MergedRecordReader; @@ -463,9 +462,7 @@ pub async fn load_stream_metadata_on_server_start( } let schema = update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?; - if CONFIG.parseable.mode == Mode::Ingest { - storage.put_schema(stream_name, &schema).await?; - } + storage.put_schema(stream_name, &schema).await?; //load stats from storage let stats = meta.stats; fetch_stats_from_storage(stream_name, stats).await; diff --git a/server/src/migration.rs b/server/src/migration.rs index d51a0a2ac..664bf2d70 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -28,9 +28,9 @@ use crate::{ metadata::load_stream_metadata_on_server_start, option::{validation::human_size_to_bytes, Config, Mode, CONFIG}, storage::{ - object_storage::{parseable_json_path, stream_json_path}, + object_storage::{parseable_json_path, schema_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, + STREAM_ROOT_DIRECTORY, }, }; use arrow_schema::Schema; @@ -166,15 +166,22 @@ async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> { async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let mut arrow_schema: Schema = Schema::empty(); - let query_schema_path = - RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = if let Ok(schema) = storage.get_object(&query_schema_path).await { + let schema_path = schema_path(stream); + let schema = if let Ok(schema) = storage.get_object(&schema_path).await { schema } else { - storage - .create_schema_from_ingestor(stream) + let querier_schema = storage + .create_schema_from_querier(stream) .await - .unwrap_or_default() + .unwrap_or_default(); + if !querier_schema.is_empty() { + querier_schema + } else { + storage + .create_schema_from_ingestor(stream) + .await + .unwrap_or_default() + } }; let path = stream_json_path(stream); @@ -222,7 +229,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let schema = serde_json::from_slice(&schema).ok(); arrow_schema = schema_migration::v1_v4(schema)?; storage - .put_object(&query_schema_path, to_bytes(&arrow_schema)) + .put_object(&schema_path, to_bytes(&arrow_schema)) .await?; } Some("v2") => { @@ -236,7 +243,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let schema = serde_json::from_slice(&schema)?; arrow_schema = schema_migration::v2_v4(schema)?; storage - .put_object(&query_schema_path, to_bytes(&arrow_schema)) + .put_object(&schema_path, to_bytes(&arrow_schema)) .await?; } Some("v3") => { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f12c88dd5..d18c87ae9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -493,6 +493,20 @@ pub trait ObjectStorage: Sync + 'static { Ok(Bytes::new()) } + async fn create_schema_from_querier( + &self, + stream_name: &str, + ) -> Result { + let path = + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); + if let Ok(querier_schema_bytes) = self.get_object(&path).await { + self.put_object(&schema_path(stream_name), querier_schema_bytes.clone()) + .await?; + return Ok(querier_schema_bytes); + } + Ok(Bytes::new()) + } + async fn create_schema_from_ingestor( &self, stream_name: &str, From f537c9f4a4253149d871cc4ff52db873a6f1181d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 26 Nov 2024 07:20:15 +0000 Subject: [PATCH 5/8] code refactor --- server/src/handlers/http/ingest.rs | 9 +- server/src/handlers/http/logstream.rs | 86 ++++++++---------- .../http/modal/ingest/ingester_logstream.rs | 25 +++--- .../http/modal/query/querier_logstream.rs | 26 +++--- .../http/modal/utils/logstream_utils.rs | 87 ++++--------------- server/src/storage/azure_blob.rs | 13 ++- server/src/storage/object_storage.rs | 18 ++-- server/src/storage/s3.rs | 13 ++- 8 files changed, 101 insertions(+), 176 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index fbb5a66f0..d8be2ca9d 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -154,12 +154,9 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result {} + Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())), } } else { return Err(PostError::StreamNotFound(stream_name.clone())); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 4a13f0ffd..e0a8deb00 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -116,17 +116,16 @@ pub async fn detect_schema(body: Bytes) -> Result { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let schema = if let Ok(schema) = STREAM_INFO.schema(&stream_name) { - schema - } else if CONFIG.parseable.mode == Mode::Query { - let stream_found = create_stream_and_schema_from_storage(&stream_name).await?; - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } else { - STREAM_INFO.schema(&stream_name)? + let schema = match STREAM_INFO.schema(&stream_name) { + Ok(schema) => schema, + Err(_) if CONFIG.parseable.mode == Mode::Query => { + if create_stream_and_schema_from_storage(&stream_name).await? { + STREAM_INFO.schema(&stream_name)? + } else { + return Err(StreamError::StreamNotFound(stream_name.clone())); + } } - } else { - return Err(StreamError::StreamNotFound(stream_name)); + Err(_) => return Err(StreamError::StreamNotFound(stream_name)), }; Ok((web::Json(schema), StatusCode::OK)) @@ -196,12 +195,9 @@ pub async fn put_alert( if !STREAM_INFO.stream_initialized(&stream_name)? { if CONFIG.parseable.mode == Mode::Query { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - } else { - return Err(StreamError::StreamNotFound(stream_name.clone())); + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::UninitializedLogstream); @@ -244,12 +240,9 @@ pub async fn get_retention(req: HttpRequest) -> Result {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::StreamNotFound(stream_name)); @@ -362,14 +355,11 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - if CONFIG.parseable.mode != Mode::All { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - } else { - return Err(StreamError::StreamNotFound(stream_name.clone())); + if !STREAM_INFO.stream_exists(&stream_name) { + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::StreamNotFound(stream_name)); @@ -541,12 +531,11 @@ pub async fn create_stream( pub async fn get_stream_info(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { + if !STREAM_INFO.stream_exists(&stream_name) { if CONFIG.parseable.mode == Mode::Query { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::StreamNotFound(stream_name)); @@ -592,12 +581,11 @@ pub async fn put_stream_hot_tier( body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { + if !STREAM_INFO.stream_exists(&stream_name) { if CONFIG.parseable.mode == Mode::Query { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::StreamNotFound(stream_name)); @@ -650,12 +638,11 @@ pub async fn put_stream_hot_tier( pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { + if !STREAM_INFO.stream_exists(&stream_name) { if CONFIG.parseable.mode == Mode::Query { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::StreamNotFound(stream_name)); @@ -683,12 +670,11 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { + if !STREAM_INFO.stream_exists(&stream_name) { if CONFIG.parseable.mode == Mode::Query { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), } } else { return Err(StreamError::StreamNotFound(stream_name)); diff --git a/server/src/handlers/http/modal/ingest/ingester_logstream.rs b/server/src/handlers/http/modal/ingest/ingester_logstream.rs index b62ee341e..ce077a53e 100644 --- a/server/src/handlers/http/modal/ingest/ingester_logstream.rs +++ b/server/src/handlers/http/modal/ingest/ingester_logstream.rs @@ -24,13 +24,14 @@ pub async fn retention_cleanup( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - } + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } + let date_list: Vec = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; let first_event_at: Option = res.unwrap_or_default(); @@ -40,12 +41,12 @@ pub async fn retention_cleanup( pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - } + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } metadata::STREAM_INFO.delete_stream(&stream_name); diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index 1172c182d..619379a5b 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -32,12 +32,12 @@ use crate::{ pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - } + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } let objectstore = CONFIG.storage().get_object_store(); @@ -98,14 +98,12 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await { - if !stream_found { - return Err(StreamError::StreamNotFound(stream_name)); - } - } else { - return Err(StreamError::StreamNotFound(stream_name)); - } + if !metadata::STREAM_INFO.stream_exists(&stream_name) + && !create_stream_and_schema_from_storage(&stream_name) + .await + .unwrap_or(false) + { + return Err(StreamError::StreamNotFound(stream_name.clone())); } let query_string = req.query_string(); diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index ae2cdf180..f5c6312e4 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -4,10 +4,8 @@ use actix_web::{http::header::HeaderMap, HttpRequest}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use http::StatusCode; -use relative_path::RelativePathBuf; use crate::{ - catalog::snapshot::Snapshot, handlers::{ http::logstream::error::{CreateStreamError, StreamError}, CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, @@ -16,11 +14,7 @@ use crate::{ metadata::{self, STREAM_INFO}, option::{Mode, CONFIG}, static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, - stats::FullStats, - storage::{ - object_storage::{schema_path, stream_json_path}, - LogStream, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY, - }, + storage::{LogStream, ObjectStoreFormat, StreamType}, validator, }; @@ -404,49 +398,15 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< name: stream_name.to_owned(), }) { let mut stream_metadata = ObjectStoreFormat::default(); - let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); - let stream_obs = storage - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") - }), - ) - .await - .into_iter() - .next(); - if let Some(stream_obs) = stream_obs { - let stream_ob = &stream_obs[0]; - let stream_ob_metdata = serde_json::from_slice::(stream_ob)?; - stream_metadata = ObjectStoreFormat { - stats: FullStats::default(), - snapshot: Snapshot::default(), - ..stream_ob_metdata - }; - - let stream_metadata_bytes = serde_json::to_vec(&stream_metadata)?.into(); - storage - .put_object(&stream_json_path(stream_name), stream_metadata_bytes) - .await?; + let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?; + if !stream_metadata_bytes.is_empty() { + stream_metadata = serde_json::from_slice::(&stream_metadata_bytes)?; } let mut schema = Arc::new(Schema::empty()); - let schema_obs = storage - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("schema") - }), - ) - .await - .into_iter() - .next(); - if let Some(schema_obs) = schema_obs { - let schema_ob = &schema_obs[0]; - storage - .put_object(&schema_path(stream_name), schema_ob.clone()) - .await?; - schema = serde_json::from_slice::>(schema_ob)?; + let schema_bytes = storage.create_schema_from_ingestor(stream_name).await?; + if !schema_bytes.is_empty() { + schema = serde_json::from_slice::>(&schema_bytes)?; } let mut static_schema: HashMap> = HashMap::new(); @@ -459,31 +419,14 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< static_schema.insert(field_name, field); } - let time_partition = if stream_metadata.time_partition.is_some() { - stream_metadata.time_partition.as_ref().unwrap() - } else { - "" - }; - let time_partition_limit = if stream_metadata.time_partition_limit.is_some() { - stream_metadata.time_partition_limit.as_ref().unwrap() - } else { - "" - }; - let custom_partition = if stream_metadata.custom_partition.is_some() { - stream_metadata.custom_partition.as_ref().unwrap() - } else { - "" - }; - let static_schema_flag = if stream_metadata.static_schema_flag.is_some() { - stream_metadata.static_schema_flag.as_ref().unwrap() - } else { - "" - }; - let stream_type = if stream_metadata.stream_type.is_some() { - stream_metadata.stream_type.as_ref().unwrap() - } else { - "" - }; + let time_partition = stream_metadata.time_partition.as_deref().unwrap_or(""); + let time_partition_limit = stream_metadata + .time_partition_limit + .as_deref() + .unwrap_or(""); + let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); + let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or(""); + let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); metadata::STREAM_INFO.add_stream( stream_name.to_string(), diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index aa79ac053..c5491be6f 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -273,23 +273,22 @@ impl BlobStore { .iter() .flat_map(|path| path.parts()) .map(|name| name.as_ref().to_string()) - .filter(|name| name != PARSEABLE_ROOT_DIRECTORY) - .filter(|name| name != USERS_ROOT_DIR) + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) .collect::>(); + for stream in streams { let stream_path = object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; - let stream_files: Vec = resp + if resp .objects .iter() - .filter(|name| name.location.filename().unwrap().ends_with("stream.json")) - .map(|name| name.location.to_string()) - .collect(); - if !stream_files.is_empty() { + .any(|name| name.location.filename().unwrap().ends_with("stream.json")) + { result_file_list.push(LogStream { name: stream }); } } + Ok(result_file_list) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d18c87ae9..2730fe563 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -435,6 +435,7 @@ pub trait ObjectStorage: Sync + 'static { STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, ]); + if let Ok(querier_stream_json_bytes) = self.get_object(&stream_path).await { let querier_stream_metadata = serde_json::from_slice::(&querier_stream_json_bytes)?; @@ -451,6 +452,7 @@ pub trait ObjectStorage: Sync + 'static { .await?; return Ok(stream_metadata_bytes); } + Ok(Bytes::new()) } @@ -459,7 +461,7 @@ pub trait ObjectStorage: Sync + 'static { stream_name: &str, ) -> Result { let stream_path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); - let stream_metadata_obs = self + if let Some(stream_metadata_obs) = self .get_objects( Some(&stream_path), Box::new(|file_name| { @@ -468,16 +470,16 @@ pub trait ObjectStorage: Sync + 'static { ) .await .into_iter() - .next(); - if let Some(stream_metadata_obs) = stream_metadata_obs { + .next() + { if !stream_metadata_obs.is_empty() { let stream_metadata_bytes = &stream_metadata_obs[0]; - let stream_ob_metdata = + let stream_ob_metadata = serde_json::from_slice::(stream_metadata_bytes)?; let stream_metadata = ObjectStoreFormat { stats: FullStats::default(), snapshot: Snapshot::default(), - ..stream_ob_metdata + ..stream_ob_metadata }; let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); @@ -512,7 +514,7 @@ pub trait ObjectStorage: Sync + 'static { stream_name: &str, ) -> Result { let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); - let schema_obs = self + if let Some(schema_obs) = self .get_objects( Some(&path), Box::new(|file_name| { @@ -521,8 +523,8 @@ pub trait ObjectStorage: Sync + 'static { ) .await .into_iter() - .next(); - if let Some(schema_obs) = schema_obs { + .next() + { let schema_ob = &schema_obs[0]; self.put_object(&schema_path(stream_name), schema_ob.clone()) .await?; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 400f305b8..6a546a148 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -401,23 +401,22 @@ impl S3 { .iter() .flat_map(|path| path.parts()) .map(|name| name.as_ref().to_string()) - .filter(|name| name != PARSEABLE_ROOT_DIRECTORY) - .filter(|name| name != USERS_ROOT_DIR) + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) .collect::>(); + for stream in streams { let stream_path = object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; - let stream_files: Vec = resp + if resp .objects .iter() - .filter(|name| name.location.filename().unwrap().ends_with("stream.json")) - .map(|name| name.location.to_string()) - .collect(); - if !stream_files.is_empty() { + .any(|name| name.location.filename().unwrap().ends_with("stream.json")) + { result_file_list.push(LogStream { name: stream }); } } + Ok(result_file_list) } From 2fac5a927a0bebb6256537dd1ccf952739960e7e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 26 Nov 2024 08:08:10 +0000 Subject: [PATCH 6/8] refactor, added comments, added license info --- server/src/handlers/http/ingest.rs | 17 ++++----- server/src/handlers/http/logstream.rs | 38 +++++++++++++++++++ ...{ingester_ingest.rs => ingestor_ingest.rs} | 18 +++++++++ ...ter_logstream.rs => ingestor_logstream.rs} | 24 ++++++++++++ .../{ingester_rbac.rs => ingestor_rbac.rs} | 18 +++++++++ .../{ingester_role.rs => ingestor_role.rs} | 18 +++++++++ server/src/handlers/http/modal/ingest/mod.rs | 24 ++++++++++-- .../src/handlers/http/modal/ingest_server.rs | 26 ++++++------- server/src/handlers/http/modal/query/mod.rs | 18 +++++++++ .../http/modal/query/querier_ingest.rs | 18 +++++++++ .../http/modal/query/querier_logstream.rs | 25 ++++++++++++ .../handlers/http/modal/query/querier_rbac.rs | 18 +++++++++ .../handlers/http/modal/query/querier_role.rs | 18 +++++++++ .../handlers/http/modal/utils/ingest_utils.rs | 18 +++++++++ .../http/modal/utils/logstream_utils.rs | 24 ++++++++++++ server/src/handlers/http/modal/utils/mod.rs | 18 +++++++++ .../handlers/http/modal/utils/rbac_utils.rs | 18 +++++++++ server/src/handlers/http/query.rs | 4 ++ server/src/migration.rs | 16 ++++++++ server/src/migration/metadata_migration.rs | 1 + server/src/storage.rs | 2 +- server/src/storage/object_storage.rs | 4 ++ 22 files changed, 359 insertions(+), 26 deletions(-) rename server/src/handlers/http/modal/ingest/{ingester_ingest.rs => ingestor_ingest.rs} (58%) rename server/src/handlers/http/modal/ingest/{ingester_logstream.rs => ingestor_logstream.rs} (81%) rename server/src/handlers/http/modal/ingest/{ingester_rbac.rs => ingestor_rbac.rs} (83%) rename server/src/handlers/http/modal/ingest/{ingester_role.rs => ingestor_role.rs} (51%) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index d8be2ca9d..f94faefce 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -32,7 +32,7 @@ use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; -use crate::storage::{LogStream, ObjectStorageError, StreamType}; +use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; @@ -153,6 +153,9 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result {} @@ -198,15 +201,11 @@ pub async fn create_stream_if_not_exists( return Ok(stream_exists); } + // For distributed deployments, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if CONFIG.parseable.mode != Mode::All { - let store = CONFIG.storage().get_object_store(); - let streams = store.list_streams().await?; - if streams.contains(&LogStream { - name: stream_name.to_owned(), - }) { - create_stream_and_schema_from_storage(stream_name).await?; - return Ok(stream_exists); - } + return Ok(create_stream_and_schema_from_storage(stream_name).await?); } super::logstream::create_stream( diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e0a8deb00..5304c18c2 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -84,6 +84,7 @@ pub async fn delete(req: HttpRequest) -> Result { } pub async fn list(_: HttpRequest) -> impl Responder { + //list all streams from storage let res = CONFIG .storage() .get_object_store() @@ -118,6 +119,10 @@ pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let schema = match STREAM_INFO.schema(&stream_name) { Ok(schema) => schema, + + //if schema not found in memory map + //create stream and schema from storage and memory + //return from memory map Err(_) if CONFIG.parseable.mode == Mode::Query => { if create_stream_and_schema_from_storage(&stream_name).await? { STREAM_INFO.schema(&stream_name)? @@ -194,6 +199,9 @@ pub async fn put_alert( validator::alert(&alerts)?; if !STREAM_INFO.stream_initialized(&stream_name)? { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if CONFIG.parseable.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} @@ -239,6 +247,9 @@ pub async fn put_alert( pub async fn get_retention(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if CONFIG.parseable.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} @@ -267,6 +278,21 @@ pub async fn put_retention( body: web::Json, ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if CONFIG.parseable.mode == Mode::Query { + match create_stream_and_schema_from_storage(&stream_name).await { + Ok(true) => {} + Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), + } + } else { + return Err(StreamError::StreamNotFound(stream_name)); + } + } + let body = body.into_inner(); let retention: Retention = match serde_json::from_value(body) { @@ -356,6 +382,9 @@ pub async fn get_stats(req: HttpRequest) -> Result let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if CONFIG.parseable.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} @@ -582,6 +611,9 @@ pub async fn put_stream_hot_tier( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if CONFIG.parseable.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} @@ -639,6 +671,9 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result {} @@ -671,6 +706,9 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result {} diff --git a/server/src/handlers/http/modal/ingest/ingester_ingest.rs b/server/src/handlers/http/modal/ingest/ingestor_ingest.rs similarity index 58% rename from server/src/handlers/http/modal/ingest/ingester_ingest.rs rename to server/src/handlers/http/modal/ingest/ingestor_ingest.rs index f7725254a..e91a27614 100644 --- a/server/src/handlers/http/modal/ingest/ingester_ingest.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_ingest.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{HttpRequest, HttpResponse}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/ingest/ingester_logstream.rs b/server/src/handlers/http/modal/ingest/ingestor_logstream.rs similarity index 81% rename from server/src/handlers/http/modal/ingest/ingester_logstream.rs rename to server/src/handlers/http/modal/ingest/ingestor_logstream.rs index ce077a53e..88ad68765 100644 --- a/server/src/handlers/http/modal/ingest/ingester_logstream.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use http::StatusCode; @@ -24,6 +42,9 @@ pub async fn retention_cleanup( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if !metadata::STREAM_INFO.stream_exists(&stream_name) && !create_stream_and_schema_from_storage(&stream_name) .await @@ -41,6 +62,9 @@ pub async fn retention_cleanup( pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if !metadata::STREAM_INFO.stream_exists(&stream_name) && !create_stream_and_schema_from_storage(&stream_name) .await diff --git a/server/src/handlers/http/modal/ingest/ingester_rbac.rs b/server/src/handlers/http/modal/ingest/ingestor_rbac.rs similarity index 83% rename from server/src/handlers/http/modal/ingest/ingester_rbac.rs rename to server/src/handlers/http/modal/ingest/ingestor_rbac.rs index 157b52959..f25abe688 100644 --- a/server/src/handlers/http/modal/ingest/ingester_rbac.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashSet; use actix_web::{web, Responder}; diff --git a/server/src/handlers/http/modal/ingest/ingester_role.rs b/server/src/handlers/http/modal/ingest/ingestor_role.rs similarity index 51% rename from server/src/handlers/http/modal/ingest/ingester_role.rs rename to server/src/handlers/http/modal/ingest/ingestor_role.rs index 0ad41e765..499157136 100644 --- a/server/src/handlers/http/modal/ingest/ingester_role.rs +++ b/server/src/handlers/http/modal/ingest/ingestor_role.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{web, HttpResponse, Responder}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/ingest/mod.rs b/server/src/handlers/http/modal/ingest/mod.rs index 26ed76438..c6a32dff7 100644 --- a/server/src/handlers/http/modal/ingest/mod.rs +++ b/server/src/handlers/http/modal/ingest/mod.rs @@ -1,3 +1,21 @@ -pub mod ingester_logstream; -pub mod ingester_rbac; -pub mod ingester_role; +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod ingestor_logstream; +pub mod ingestor_rbac; +pub mod ingestor_role; diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index f6e54ce8e..1e0e9dd21 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -40,9 +40,9 @@ use crate::sync; use std::sync::Arc; -use super::ingest::ingester_logstream; -use super::ingest::ingester_rbac; -use super::ingest::ingester_role; +use super::ingest::ingestor_logstream; +use super::ingest::ingestor_rbac; +use super::ingest::ingestor_role; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::IngestorMetadata; @@ -251,7 +251,7 @@ impl IngestServer { ) .service( resource("/{name}/sync") - .route(web::put().to(ingester_role::put).authorize(Action::PutRole)), + .route(web::put().to(ingestor_role::put).authorize(Action::PutRole)), ) } // get the user webscope @@ -262,13 +262,13 @@ impl IngestServer { // PUT /user/{username}/sync => Sync creation of a new user .route( web::post() - .to(ingester_rbac::post_user) + .to(ingestor_rbac::post_user) .authorize(Action::PutUser), ) // DELETE /user/{username} => Sync deletion of a user .route( web::delete() - .to(ingester_rbac::delete_user) + .to(ingestor_rbac::delete_user) .authorize(Action::DeleteUser), ) .wrap(DisAllowRootUser), @@ -278,7 +278,7 @@ impl IngestServer { // PUT /user/{username}/roles => Put roles for user .route( web::put() - .to(ingester_rbac::put_role) + .to(ingestor_rbac::put_role) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), @@ -288,7 +288,7 @@ impl IngestServer { // POST /user/{username}/generate-new-password => reset password for this user .route( web::post() - .to(ingester_rbac::post_gen_password) + .to(ingestor_rbac::post_gen_password) .authorize(Action::PutUser) .wrap(DisAllowRootUser), ), @@ -311,13 +311,13 @@ impl IngestServer { // DELETE "/logstream/{logstream}/sync" ==> Sync deletion of a log stream .route( web::delete() - .to(ingester_logstream::delete) + .to(ingestor_logstream::delete) .authorize(Action::DeleteStream), ) // PUT "/logstream/{logstream}/sync" ==> Sync creation of a new log stream .route( web::put() - .to(ingester_logstream::put_stream) + .to(ingestor_logstream::put_stream) .authorize_for_stream(Action::CreateStream), ), ) @@ -342,13 +342,13 @@ impl IngestServer { // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream .route( web::put() - .to(ingester_logstream::put_enable_cache) + .to(ingestor_logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), ) // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream .route( web::get() - .to(ingester_logstream::get_cache_enabled) + .to(ingestor_logstream::get_cache_enabled) .authorize_for_stream(Action::GetCacheEnabled), ), ) @@ -356,7 +356,7 @@ impl IngestServer { web::scope("/retention").service( web::resource("/cleanup").route( web::post() - .to(ingester_logstream::retention_cleanup) + .to(ingestor_logstream::retention_cleanup) .authorize_for_stream(Action::PutRetention), ), ), diff --git a/server/src/handlers/http/modal/query/mod.rs b/server/src/handlers/http/modal/query/mod.rs index 704f9ca54..8ef11dd60 100644 --- a/server/src/handlers/http/modal/query/mod.rs +++ b/server/src/handlers/http/modal/query/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod querier_ingest; pub mod querier_logstream; pub mod querier_rbac; diff --git a/server/src/handlers/http/modal/query/querier_ingest.rs b/server/src/handlers/http/modal/query/querier_ingest.rs index 2e5e140c6..1eff3999a 100644 --- a/server/src/handlers/http/modal/query/querier_ingest.rs +++ b/server/src/handlers/http/modal/query/querier_ingest.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::handlers::http::ingest::PostError; use actix_web::{HttpRequest, HttpResponse}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index 619379a5b..86e887e3b 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use core::str; use std::fs; @@ -32,6 +50,10 @@ use crate::{ pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if !metadata::STREAM_INFO.stream_exists(&stream_name) && !create_stream_and_schema_from_storage(&stream_name) .await @@ -98,6 +120,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if !metadata::STREAM_INFO.stream_exists(&stream_name) && !create_stream_and_schema_from_storage(&stream_name) .await diff --git a/server/src/handlers/http/modal/query/querier_rbac.rs b/server/src/handlers/http/modal/query/querier_rbac.rs index a5b88c33b..ae2af1c2d 100644 --- a/server/src/handlers/http/modal/query/querier_rbac.rs +++ b/server/src/handlers/http/modal/query/querier_rbac.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashSet; use actix_web::{web, Responder}; diff --git a/server/src/handlers/http/modal/query/querier_role.rs b/server/src/handlers/http/modal/query/querier_role.rs index c17489273..b9930579c 100644 --- a/server/src/handlers/http/modal/query/querier_role.rs +++ b/server/src/handlers/http/modal/query/querier_role.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use actix_web::{web, HttpResponse, Responder}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/utils/ingest_utils.rs b/server/src/handlers/http/modal/utils/ingest_utils.rs index 9d29d0a76..81ccfbd44 100644 --- a/server/src/handlers/http/modal/utils/ingest_utils.rs +++ b/server/src/handlers/http/modal/utils/ingest_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{ collections::{BTreeMap, HashMap}, sync::Arc, diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index f5c6312e4..62bf90f61 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{collections::HashMap, num::NonZeroU32, sync::Arc}; use actix_web::{http::header::HeaderMap, HttpRequest}; @@ -41,6 +59,9 @@ pub async fn create_update_stream( }); } + // if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage if !metadata::STREAM_INFO.stream_exists(stream_name) && CONFIG.parseable.mode == Mode::Query && create_stream_and_schema_from_storage(stream_name).await? @@ -390,6 +411,9 @@ pub async fn create_stream( Ok(()) } +/// list all streams from storage +/// if stream exists in storage, create stream and schema from storage +/// and add it to the memory map pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result { // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); diff --git a/server/src/handlers/http/modal/utils/mod.rs b/server/src/handlers/http/modal/utils/mod.rs index 7ec7e1cbd..61930d43d 100644 --- a/server/src/handlers/http/modal/utils/mod.rs +++ b/server/src/handlers/http/modal/utils/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod ingest_utils; pub mod logstream_utils; pub mod rbac_utils; diff --git a/server/src/handlers/http/modal/utils/rbac_utils.rs b/server/src/handlers/http/modal/utils/rbac_utils.rs index 195a69a69..fb8d2e276 100644 --- a/server/src/handlers/http/modal/utils/rbac_utils.rs +++ b/server/src/handlers/http/modal/utils/rbac_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::{ option::CONFIG, storage::{self, ObjectStorageError, StorageMetadata}, diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 5115c1a8c..f99b170f2 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -77,6 +77,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result raw_logical_plan, Err(_) => { + //if logical plan creation fails, create streams and try again create_streams_for_querier().await; session_state .create_logical_plan(&query_request.query) @@ -187,6 +188,9 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q Ok(()) } +/// Create streams for querier if they do not exist +/// get list of streams from memory and storage +/// create streams for memory from storage if they do not exist pub async fn create_streams_for_querier() { let querier_streams = STREAM_INFO.list_streams(); let store = CONFIG.storage().get_object_store(); diff --git a/server/src/migration.rs b/server/src/migration.rs index 664bf2d70..c57eafcff 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -65,6 +65,8 @@ pub async fn run_metadata_migration( if let Some(storage_metadata) = storage_metadata { match get_version(&storage_metadata) { Some("v1") => { + //migrate to latest version + //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v1_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); metadata = metadata_migration::v4_v5(metadata); @@ -72,6 +74,8 @@ pub async fn run_metadata_migration( put_remote_metadata(&*object_store, &metadata).await?; } Some("v2") => { + //migrate to latest version + //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v2_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); metadata = metadata_migration::v4_v5(metadata); @@ -79,18 +83,23 @@ pub async fn run_metadata_migration( put_remote_metadata(&*object_store, &metadata).await?; } Some("v3") => { + //migrate to latest version + //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v3_v4(storage_metadata); metadata = metadata_migration::v4_v5(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v4") => { + //migrate to latest version + //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v4_v5(storage_metadata); metadata = metadata_migration::v4_v5(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); put_remote_metadata(&*object_store, &metadata).await?; } _ => { + //remove querier endpooint and token from storage metadata let metadata = metadata_migration::remove_querier_metadata(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } @@ -166,6 +175,10 @@ async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> { async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let mut arrow_schema: Schema = Schema::empty(); + + //check if schema exists for the node + //if not, create schema from querier schema from storage + //if not present with querier, create schema from ingestor schema from storage let schema_path = schema_path(stream); let schema = if let Ok(schema) = storage.get_object(&schema_path).await { schema @@ -184,6 +197,9 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } }; + //check if stream.json exists for the node + //if not, create stream.json from querier stream.json from storage + //if not present with querier, create from ingestor stream.json from storage let path = stream_json_path(stream); let stream_metadata = if let Ok(stream_metadata) = storage.get_object(&path).await { stream_metadata diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 0eb9363a8..f6a194356 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -181,6 +181,7 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } +/// Remove the querier endpoint and auth token from the storage metadata pub fn remove_querier_metadata(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); metadata.remove("querier_endpoint"); diff --git a/server/src/storage.rs b/server/src/storage.rs index b06dd368f..a018c2b1c 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -202,7 +202,7 @@ impl ObjectStoreFormat { } } -#[derive(serde::Serialize, PartialEq, Debug)] +#[derive(serde::Serialize, PartialEq)] pub struct LogStream { pub name: String, } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 2730fe563..ff2a56953 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -426,6 +426,7 @@ pub trait ObjectStorage: Sync + 'static { .await } + ///create stream from querier stream.json from storage async fn create_stream_from_querier( &self, stream_name: &str, @@ -456,6 +457,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(Bytes::new()) } + ///create stream from ingestor stream.json from storage async fn create_stream_from_ingestor( &self, stream_name: &str, @@ -495,6 +497,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(Bytes::new()) } + ///create schema from querier schema from storage async fn create_schema_from_querier( &self, stream_name: &str, @@ -509,6 +512,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(Bytes::new()) } + ///create schema from ingestor schema from storage async fn create_schema_from_ingestor( &self, stream_name: &str, From 2b395c1253dcddfbbaf7e55c1642caaef47c5d4c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 26 Nov 2024 10:20:04 +0000 Subject: [PATCH 7/8] deepsource analysis fix --- .../http/modal/utils/logstream_utils.rs | 113 +++++++++++------- 1 file changed, 68 insertions(+), 45 deletions(-) diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index 62bf90f61..0081a258a 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -46,11 +46,11 @@ pub async fn create_update_stream( time_partition_limit, custom_partition, static_schema_flag, - update_stream, + update_stream_flag, stream_type, ) = fetch_headers_from_put_stream_request(req); - if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { + if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" { return Err(StreamError::Custom { msg: format!( "Logstream {stream_name} already exists, please create a new log stream with unique name" @@ -59,58 +59,36 @@ pub async fn create_update_stream( }); } - // if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage if !metadata::STREAM_INFO.stream_exists(stream_name) && CONFIG.parseable.mode == Mode::Query && create_stream_and_schema_from_storage(stream_name).await? { return Err(StreamError::Custom { - msg: format!( - "Logstream {stream_name} already exists, please create a new log stream with unique name" - ), - status: StatusCode::BAD_REQUEST, - }); + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); } - if update_stream == "true" { - if !STREAM_INFO.stream_exists(stream_name) { - return Err(StreamError::StreamNotFound(stream_name.to_string())); - } - if !time_partition.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the time partition of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !static_schema_flag.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the schema of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + if update_stream_flag == "true" { + return update_stream( + req, + stream_name, + &time_partition, + &static_schema_flag, + &time_partition_limit, + &custom_partition, + ) + .await; + } - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; - update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) - .await?; - return Ok(req.headers().clone()); - } + let time_partition_in_days = if !time_partition_limit.is_empty() { + validate_time_partition_limit(&time_partition_limit)? + } else { + "" + }; - if !custom_partition.is_empty() { - validate_custom_partition(&custom_partition)?; - update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; - } else { - update_custom_partition_in_stream(stream_name.to_string(), "").await?; - } - return Ok(req.headers().clone()); - } - let mut time_partition_in_days = ""; - if !time_partition_limit.is_empty() { - time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?; - } if !custom_partition.is_empty() { validate_custom_partition(&custom_partition)?; } @@ -141,6 +119,51 @@ pub async fn create_update_stream( Ok(req.headers().clone()) } +async fn update_stream( + req: &HttpRequest, + stream_name: &str, + time_partition: &str, + static_schema_flag: &str, + time_partition_limit: &str, + custom_partition: &str, +) -> Result { + if !STREAM_INFO.stream_exists(stream_name) { + return Err(StreamError::StreamNotFound(stream_name.to_string())); + } + if !time_partition.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the time partition of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + if !static_schema_flag.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the schema of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(time_partition_limit)?; + update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days).await?; + return Ok(req.headers().clone()); + } + validate_and_update_custom_partition(stream_name, custom_partition).await?; + return Ok(req.headers().clone()); +} + +async fn validate_and_update_custom_partition( + stream_name: &str, + custom_partition: &str, +) -> Result<(), StreamError> { + if !custom_partition.is_empty() { + validate_custom_partition(custom_partition)?; + update_custom_partition_in_stream(stream_name.to_string(), custom_partition).await?; + } else { + update_custom_partition_in_stream(stream_name.to_string(), "").await?; + } + Ok(()) +} + pub fn fetch_headers_from_put_stream_request( req: &HttpRequest, ) -> (String, String, String, String, String, String) { From 5593ed8f79252ace829f43d1762d6e1259458ee7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 27 Nov 2024 14:31:22 +0530 Subject: [PATCH 8/8] fix: side-effect in test --- server/src/handlers/http/logstream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 5304c18c2..84c500709 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -385,7 +385,7 @@ pub async fn get_stats(req: HttpRequest) -> Result // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if CONFIG.parseable.mode == Mode::Query { + if cfg!(not(test)) && CONFIG.parseable.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),