From 70937d45b877c212857d7d7b2b8cf5a8d65c2396 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 10 Apr 2024 14:25:17 +0530 Subject: [PATCH 1/2] multiple fixes on server: 1. fixed banner spacing 2. modified server mode: All to Standalone, Ingest to Distributed (Ingest), Query to Distributed (Query) 3. updated server mode in about API response 4. updated logic for env var P_INGESTOR_URL to use HOSTNAME and PORT from env 5. remvoed put cache api from querier 6. added put cache api to ingester --- server/src/about.rs | 6 ++-- server/src/banner.rs | 18 +++++----- server/src/catalog.rs | 6 ++-- server/src/cli.rs | 3 +- server/src/handlers/http/about.rs | 2 +- .../src/handlers/http/modal/ingest_server.rs | 12 ++++++- server/src/handlers/http/modal/server.rs | 16 --------- server/src/metrics/prom_utils.rs | 5 ++- server/src/option.rs | 8 +++++ server/src/storage/object_storage.rs | 22 ++++++++---- server/src/storage/staging.rs | 18 +++++----- server/src/utils.rs | 35 ++++++++++++++----- 12 files changed, 89 insertions(+), 62 deletions(-) diff --git a/server/src/about.rs b/server/src/about.rs index 9aea9ff28..f6680c919 100644 --- a/server/src/about.rs +++ b/server/src/about.rs @@ -90,7 +90,7 @@ pub fn print_about( eprint!( " {} - Version:\t\t\t\t\t\"v{}\"", + Version: \"v{}\"", "About:".to_string().bold(), current_version, ); // " " " " @@ -103,8 +103,8 @@ pub fn print_about( eprintln!( " - Commit:\t\t\t\t\t\t\"{commit_hash}\" - Docs:\t\t\t\t\t\t\"https://logg.ing/docs\"" + Commit: \"{commit_hash}\" + Docs: \"https://logg.ing/docs\"" ); } diff --git a/server/src/banner.rs b/server/src/banner.rs index d9f3cc609..ca665ffa4 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -77,14 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { eprintln!( " {} - Address:\t\t\t\t\t{} - Credentials:\t\t\t\t\t{} - Server Mode:\t\t\t\t\t\"{}\" - LLM Status:\t\t\t\t\t\"{}\"", + Address: {} + Credentials: {} + Server Mode: \"{}\" + LLM Status: \"{}\"", "Server:".to_string().bold(), address, credentials, - config.parseable.mode.to_str(), + config.get_server_mode_string(), llm_status ); } @@ -101,8 +101,8 @@ async fn storage_info(config: &Config) { eprintln!( " {} - Storage Mode:\t\t\t\t\t\"{}\" - Staging Path:\t\t\t\t\t\"{}\"", + Storage Mode: \"{}\" + Staging Path: \"{}\"", "Storage:".to_string().bold(), config.get_storage_mode_string(), config.staging_dir().to_string_lossy(), @@ -116,7 +116,7 @@ async fn storage_info(config: &Config) { eprintln!( "\ - {:8}Cache:\t\t\t\t\t\"{}\", (size: {})", + {:8}Cache: \"{}\", (size: {})", "", path.display(), size @@ -125,7 +125,7 @@ async fn storage_info(config: &Config) { eprintln!( "\ - {:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})", + {:8}Store: \"{}\", (latency: {:?})", "", storage.get_endpoint(), latency diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 0e8716dce..1d0270a88 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -118,7 +118,7 @@ pub async fn update_snapshot( let mut ch = false; for m in manifests.iter() { let s = get_address(); - let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE); + let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE); if m.manifest_path.contains(&p) { ch = true; } @@ -152,7 +152,7 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); + let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage @@ -186,7 +186,7 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); + let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) diff --git a/server/src/cli.rs b/server/src/cli.rs index b02c8a225..41e03b39f 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -322,7 +322,6 @@ impl Cli { .env("P_INGESTOR_URL") .value_name("URL") .required(false) - .value_parser(validation::socket_addr) .help("URL to connect to this specific ingestor. Default is the address of the server.") ) .arg( @@ -371,7 +370,7 @@ impl FromArgMatches for Cli { self.ingestor_url = m .get_one::(Self::INGESTOR_URL) .cloned() - .unwrap_or_else(|| self.address.clone()); + .unwrap_or_else(String::default); self.local_staging_path = m .get_one::(Self::STAGING) diff --git a/server/src/handlers/http/about.rs b/server/src/handlers/http/about.rs index 347cd0d3f..1603139ff 100644 --- a/server/src/handlers/http/about.rs +++ b/server/src/handlers/http/about.rs @@ -65,7 +65,7 @@ pub async fn about() -> Json { let current_version = format!("v{}", current_release.released_version); let commit = current_release.commit_hash; let deployment_id = meta.deployment_id.to_string(); - let mode = CONFIG.parseable.mode.to_str(); + let mode = CONFIG.get_server_mode_string(); let staging = if CONFIG.parseable.mode == Mode::Query { "".to_string() } else { diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 5c0334a98..3fb54e987 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -39,6 +39,7 @@ use super::OpenIdClient; use super::ParseableServer; use super::DEFAULT_VERSION; +use crate::utils::get_address; use actix_web::body::MessageBody; use actix_web::Scope; use actix_web::{web, App, HttpServer}; @@ -196,6 +197,15 @@ impl IngestServer { .to(logstream::get_stats) .authorize_for_stream(Action::GetStats), ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ), ), ) } @@ -204,7 +214,7 @@ impl IngestServer { async fn set_ingester_metadata(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); - let sock = Server::get_server_address(); + let sock = get_address(); let path = ingester_metadata_path(sock.ip().to_string(), sock.port().to_string()); if store.get_object(&path).await.is_ok() { diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 89cd3aa59..df9cd0afc 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -32,7 +32,6 @@ use crate::migration; use crate::rbac; use crate::storage; use crate::sync; -use std::net::SocketAddr; use std::{fs::File, io::BufReader, sync::Arc}; use actix_web::web::resource; @@ -270,12 +269,6 @@ impl Server { ) .service( web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream .route( web::get() @@ -475,13 +468,4 @@ impl Server { }; } } - - #[inline(always)] - pub fn get_server_address() -> SocketAddr { - // this might cause an issue down the line - // best is to make the Cli Struct better, but thats a chore - (CONFIG.parseable.ingestor_url.clone()) - .parse::() - .unwrap() - } } diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 72442a96c..6fb99405d 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -1,11 +1,10 @@ +use crate::utils::get_address; use prometheus_parse::Sample as PromSample; use prometheus_parse::Value as PromValue; use serde::Serialize; use serde_json::Error as JsonError; use serde_json::Value as JsonValue; -use crate::handlers::http::modal::server::Server; - #[derive(Debug, Serialize, Clone)] pub struct Metrics { address: String, @@ -23,7 +22,7 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { - let socket = Server::get_server_address(); + let socket = get_address(); let address = format!("http://{}:{}", socket.ip(), socket.port()); Metrics { address, diff --git a/server/src/option.rs b/server/src/option.rs index dd92eee2f..1983fdd07 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -165,6 +165,14 @@ impl Config { } "S3 bucket" } + + pub fn get_server_mode_string(&self) -> &str { + match self.parseable.mode { + Mode::Query => "Distributed (Query)", + Mode::Ingest => "Distributed (Ingest)", + Mode::All => "Standalone", + } + } } fn create_parseable_cli_command() -> Command { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 9efa477a2..b61ccd619 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -474,7 +474,7 @@ pub trait ObjectStorage: Sync + 'static { cache_updates .entry(stream) .or_default() - .push((absolute_path, file)) + .push((absolute_path, file)); } else { let _ = fs::remove_file(file); } @@ -539,8 +539,13 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { fn schema_path(stream_name: &str) -> RelativePathBuf { match CONFIG.parseable.mode { Mode::Ingest => { - let (ip, port) = get_address(); - let file_name = format!(".ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME); + let addr = get_address(); + let file_name = format!( + ".ingester.{}.{}{}", + addr.ip(), + addr.port(), + SCHEMA_FILE_NAME + ); RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) } @@ -554,8 +559,13 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { match &CONFIG.parseable.mode { Mode::Ingest => { - let (ip, port) = get_address(); - let file_name = format!(".ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME); + let addr = get_address(); + let file_name = format!( + ".ingester.{}.{}{}", + addr.ip(), + addr.port(), + STREAM_METADATA_FILE_NAME + ); RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) } Mode::Query | Mode::All => RelativePathBuf::from_iter([ @@ -580,7 +590,7 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] fn manifest_path(prefix: &str) -> RelativePathBuf { let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); + let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); RelativePathBuf::from_iter([prefix, &mainfest_file_name]) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index cab1a51b1..f2f42a901 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -25,6 +25,13 @@ use std::{ sync::Arc, }; +use crate::{ + event::DEFAULT_TIMESTAMP_KEY, + metrics, + option::CONFIG, + storage::OBJECT_STORE_DATA_GRANULARITY, + utils::{self, arrow::merged_reader::MergedReverseRecordReader, get_address}, +}; use arrow_schema::{ArrowError, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use parquet::{ @@ -36,15 +43,6 @@ use parquet::{ schema::types::ColumnPath, }; -use super::super::handlers::http::modal::server::Server; -use crate::{ - event::DEFAULT_TIMESTAMP_KEY, - metrics, - option::CONFIG, - storage::OBJECT_STORE_DATA_GRANULARITY, - utils::{self, arrow::merged_reader::MergedReverseRecordReader}, -}; - const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -65,7 +63,7 @@ impl StorageDir { + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); - let sock = Server::get_server_address(); + let sock = get_address(); let ip = sock.ip(); let port = sock.port(); format!("{local_uri}{ip}.{port}.{extention}") diff --git a/server/src/utils.rs b/server/src/utils.rs index 5cbe596d2..7ad6a14e1 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -22,12 +22,10 @@ pub mod header_parsing; pub mod json; pub mod uid; pub mod update; - -use std::net::{IpAddr, SocketAddr}; - -use chrono::{DateTime, NaiveDate, Timelike, Utc}; - use crate::option::CONFIG; +use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use std::env; +use std::net::SocketAddr; #[allow(dead_code)] pub fn hostname() -> Option { @@ -227,9 +225,30 @@ impl TimePeriod { } #[inline(always)] -pub fn get_address() -> (IpAddr, u16) { - let addr = CONFIG.parseable.ingestor_url.parse::().unwrap(); - (addr.ip(), addr.port()) +pub fn get_address() -> SocketAddr { + if CONFIG.parseable.ingestor_url.is_empty() { + CONFIG.parseable.address.parse::().unwrap() + } else { + let addr_from_env = CONFIG + .parseable + .ingestor_url + .split(':') + .collect::>(); + + let mut hostname = addr_from_env[0].to_string(); + let mut port = addr_from_env[1].to_string(); + if hostname.starts_with('$') && port.starts_with('$') { + hostname = get_from_env("HOSTNAME"); + port = get_from_env("PORT"); + let addr = format!("{}:{}", hostname, port); + addr.parse::().unwrap() + } else { + CONFIG.parseable.ingestor_url.parse::().unwrap() + } + } +} +fn get_from_env(var_to_fetch: &str) -> String { + env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) } #[cfg(test)] From 9a4816955e83a28cdcba42bdefac5680a22507fb Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 10 Apr 2024 19:11:13 +0530 Subject: [PATCH 2/2] added below fixes: 1. renamed ingester to ingestor 2. corrected cache flow for ingestors and standalone 3. removed query, other logstream apis for ingestors --- server/src/analytics.rs | 40 +++---- server/src/handlers/http.rs | 8 +- server/src/handlers/http/cluster/mod.rs | 108 +++++++++--------- server/src/handlers/http/cluster/utils.rs | 32 +++--- server/src/handlers/http/logstream.rs | 33 ++++-- .../src/handlers/http/modal/ingest_server.rs | 104 ++++++----------- server/src/handlers/http/modal/mod.rs | 12 +- .../src/handlers/http/modal/query_server.rs | 8 +- server/src/handlers/http/modal/server.rs | 9 ++ server/src/handlers/http/query.rs | 4 +- server/src/migration.rs | 2 +- server/src/query/stream_schema_provider.rs | 2 +- server/src/rbac/role.rs | 8 +- server/src/storage/localfs.rs | 16 +-- server/src/storage/object_storage.rs | 14 +-- server/src/storage/s3.rs | 18 +-- 16 files changed, 207 insertions(+), 211 deletions(-) diff --git a/server/src/analytics.rs b/server/src/analytics.rs index f4e0988d8..9a76d0db7 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -64,8 +64,8 @@ pub struct Report { server_mode: String, version: String, commit_hash: String, - active_ingesters: u64, - inactive_ingesters: u64, + active_ingestors: u64, + inactive_ingestors: u64, stream_count: usize, total_events_count: u64, total_json_bytes: u64, @@ -91,7 +91,7 @@ impl Report { cpu_count = info.cpus().len(); mem_total = info.total_memory(); } - let ingester_metrics = fetch_ingesters_metrics().await; + let ingestor_metrics = fetch_ingestors_metrics().await; Self { deployment_id: storage::StorageMetadata::global().deployment_id, @@ -106,12 +106,12 @@ impl Report { server_mode: CONFIG.parseable.mode.to_string(), version: current().released_version.to_string(), commit_hash: current().commit_hash, - active_ingesters: ingester_metrics.0, - inactive_ingesters: ingester_metrics.1, - stream_count: ingester_metrics.2, - total_events_count: ingester_metrics.3, - total_json_bytes: ingester_metrics.4, - total_parquet_bytes: ingester_metrics.5, + active_ingestors: ingestor_metrics.0, + inactive_ingestors: ingestor_metrics.1, + stream_count: ingestor_metrics.2, + total_events_count: ingestor_metrics.3, + total_json_bytes: ingestor_metrics.4, + total_parquet_bytes: ingestor_metrics.5, metrics: build_metrics().await, } } @@ -122,7 +122,7 @@ impl Report { } } -/// build the node metrics for the node ingester endpoint +/// build the node metrics for the node ingestor endpoint pub async fn get_analytics(_: HttpRequest) -> impl Responder { let json = NodeMetrics::build(); web::Json(json) @@ -148,23 +148,23 @@ fn total_event_stats() -> (u64, u64, u64) { (total_events, total_json_bytes, total_parquet_bytes) } -async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) { +async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) { let event_stats = total_event_stats(); let mut node_metrics = NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2); let mut vec = vec![]; - let mut active_ingesters = 0u64; - let mut offline_ingesters = 0u64; + let mut active_ingestors = 0u64; + let mut offline_ingestors = 0u64; if CONFIG.parseable.mode == Mode::Query { // send analytics for ingest servers - // ingester infos should be valid here, if not some thing is wrong - let ingester_infos = cluster::get_ingester_info().await.unwrap(); + // ingestor infos should be valid here, if not some thing is wrong + let ingestor_infos = cluster::get_ingestor_info().await.unwrap(); - for im in ingester_infos { + for im in ingestor_infos { if !check_liveness(&im.domain_name).await { - offline_ingesters += 1; + offline_ingestors += 1; continue; } @@ -185,15 +185,15 @@ async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) { let data = serde_json::from_slice::(&resp.bytes().await.unwrap()).unwrap(); vec.push(data); - active_ingesters += 1; + active_ingestors += 1; } node_metrics.accumulate(&mut vec); } ( - active_ingesters, - offline_ingesters, + active_ingestors, + offline_ingestors, node_metrics.stream_count, node_metrics.total_events_count, node_metrics.total_json_bytes, diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index a3b3f4fc7..a1f506b29 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -23,7 +23,7 @@ use serde_json::Value; use crate::option::CONFIG; -use self::{cluster::get_ingester_info, query::Query}; +use self::{cluster::get_ingestor_info, query::Query}; pub(crate) mod about; pub mod cluster; @@ -94,10 +94,10 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { - // send the query request to the ingester +pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result> { + // send the query request to the ingestor let mut res = vec![]; - let ima = get_ingester_info().await.unwrap(); + let ima = get_ingestor_info().await.unwrap(); for im in ima.iter() { let uri = format!( diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 5bc5f8719..58f3f79f8 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -27,7 +27,7 @@ use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; -use crate::storage::object_storage::ingester_metadata_path; +use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header; @@ -40,39 +40,39 @@ use relative_path::RelativePathBuf; use serde_json::Value as JsonValue; use url::Url; -type IngesterMetadataArr = Vec; +type IngestorMetadataArr = Vec; use self::utils::StorageStats; use super::base_path_without_preceding_slash; -use super::modal::IngesterMetadata; +use super::modal::IngestorMetadata; -// forward the request to all ingesters to keep them in sync +// forward the request to all ingestors to keep them in sync #[allow(dead_code)] -pub async fn sync_streams_with_ingesters( +pub async fn sync_streams_with_ingestors( stream_name: &str, time_partition: &str, static_schema: &str, schema: Bytes, ) -> Result<(), StreamError> { - let ingester_infos = get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; let mut errored = false; - for ingester in ingester_infos.iter() { + for ingestor in ingestor_infos.iter() { let url = format!( "{}{}/logstream/{}", - ingester.domain_name, + ingestor.domain_name, base_path_without_preceding_slash(), stream_name ); match send_stream_sync_request( &url, - ingester.clone(), + ingestor.clone(), time_partition, static_schema, schema.clone(), @@ -88,21 +88,21 @@ pub async fn sync_streams_with_ingesters( } if errored { - for ingester in ingester_infos { + for ingestor in ingestor_infos { let url = format!( "{}{}/logstream/{}", - ingester.domain_name, + ingestor.domain_name, base_path_without_preceding_slash(), stream_name ); // roll back the stream creation - send_stream_rollback_request(&url, ingester.clone()).await?; + send_stream_rollback_request(&url, ingestor.clone()).await?; } // this might be a bit too much return Err(StreamError::Custom { - msg: "Failed to sync stream with ingesters".to_string(), + msg: "Failed to sync stream with ingestors".to_string(), status: StatusCode::INTERNAL_SERVER_ERROR, }); } @@ -110,8 +110,8 @@ pub async fn sync_streams_with_ingesters( Ok(()) } -/// get the cumulative stats from all ingesters -pub async fn fetch_stats_from_ingesters( +/// get the cumulative stats from all ingestors +pub async fn fetch_stats_from_ingestors( stream_name: &str, ) -> Result, StreamError> { let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); @@ -120,7 +120,7 @@ pub async fn fetch_stats_from_ingesters( .get_object_store() .get_objects( Some(&path), - Box::new(|file_name| file_name.starts_with(".ingester")), + Box::new(|file_name| file_name.starts_with(".ingestor")), ) .await?; let mut ingestion_size = 0u64; @@ -147,12 +147,12 @@ pub async fn fetch_stats_from_ingesters( #[allow(dead_code)] async fn send_stream_sync_request( url: &str, - ingester: IngesterMetadata, + ingestor: IngestorMetadata, time_partition: &str, static_schema: &str, schema: Bytes, ) -> Result<(), StreamError> { - if !utils::check_liveness(&ingester.domain_name).await { + if !utils::check_liveness(&ingestor.domain_name).await { return Ok(()); } @@ -162,14 +162,14 @@ async fn send_stream_sync_request( .header(header::CONTENT_TYPE, "application/json") .header(TIME_PARTITION_KEY, time_partition) .header(STATIC_SCHEMA_FLAG, static_schema) - .header(header::AUTHORIZATION, ingester.token) + .header(header::AUTHORIZATION, ingestor.token) .body(schema) .send() .await .map_err(|err| { log::error!( - "Fatal: failed to forward create stream request to ingester: {}\n Error: {:?}", - ingester.domain_name, + "Fatal: failed to forward create stream request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err ); StreamError::Network(err) @@ -177,8 +177,8 @@ async fn send_stream_sync_request( if !res.status().is_success() { log::error!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name, + "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, res ); return Err(StreamError::Network(res.error_for_status().unwrap_err())); @@ -187,13 +187,13 @@ async fn send_stream_sync_request( Ok(()) } -/// send a rollback request to all ingesters +/// send a rollback request to all ingestors #[allow(dead_code)] async fn send_stream_rollback_request( url: &str, - ingester: IngesterMetadata, + ingestor: IngestorMetadata, ) -> Result<(), StreamError> { - if !utils::check_liveness(&ingester.domain_name).await { + if !utils::check_liveness(&ingestor.domain_name).await { return Ok(()); } @@ -201,14 +201,14 @@ async fn send_stream_rollback_request( let resp = client .delete(url) .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingester.token) + .header(header::AUTHORIZATION, ingestor.token) .send() .await .map_err(|err| { // log the error and return a custom error log::error!( "Fatal: failed to rollback stream creation: {}\n Error: {:?}", - ingester.domain_name, + ingestor.domain_name, err ); StreamError::Network(err) @@ -219,13 +219,13 @@ async fn send_stream_rollback_request( if !resp.status().is_success() { log::error!( "failed to rollback stream creation: {}\nResponse Returned: {:?}", - ingester.domain_name, + ingestor.domain_name, resp ); return Err(StreamError::Custom { msg: format!( "failed to rollback stream creation: {}\nResponse Returned: {:?}", - ingester.domain_name, + ingestor.domain_name, resp.text().await.unwrap_or_default() ), status: StatusCode::INTERNAL_SERVER_ERROR, @@ -236,24 +236,24 @@ async fn send_stream_rollback_request( } pub async fn get_cluster_info() -> Result { - let ingester_infos = get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; let mut infos = vec![]; - for ingester in ingester_infos { + for ingestor in ingestor_infos { let uri = Url::parse(&format!( "{}{}/about", - ingester.domain_name, + ingestor.domain_name, base_path_without_preceding_slash() )) .expect("should always be a valid url"); let resp = reqwest::Client::new() .get(uri) - .header(header::AUTHORIZATION, ingester.token.clone()) + .header(header::AUTHORIZATION, ingestor.token.clone()) .header(header::CONTENT_TYPE, "application/json") .send() .await; @@ -262,13 +262,13 @@ pub async fn get_cluster_info() -> Result { let status = Some(resp.status().to_string()); let resp_data = resp.bytes().await.map_err(|err| { - log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err); + log::error!("Fatal: failed to parse ingestor info to bytes: {:?}", err); StreamError::Network(err) })?; let sp = serde_json::from_slice::(&resp_data) .map_err(|err| { - log::error!("Fatal: failed to parse ingester info: {:?}", err); + log::error!("Fatal: failed to parse ingestor info: {:?}", err); StreamError::SerdeError(err) })? .get("staging") @@ -288,7 +288,7 @@ pub async fn get_cluster_info() -> Result { }; infos.push(utils::ClusterInfo::new( - &ingester.domain_name, + &ingestor.domain_name, reachable, staging_path, CONFIG.storage().get_endpoint(), @@ -301,17 +301,17 @@ pub async fn get_cluster_info() -> Result { } pub async fn get_cluster_metrics() -> Result { - let ingester_metadata = get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); + let ingestor_metadata = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err) })?; let mut dresses = vec![]; - for ingester in ingester_metadata { + for ingestor in ingestor_metadata { let uri = Url::parse(&format!( "{}{}/metrics", - &ingester.domain_name, + &ingestor.domain_name, base_path_without_preceding_slash() )) .unwrap(); @@ -333,12 +333,12 @@ pub async fn get_cluster_metrics() -> Result { dresses.push(Metrics::from_prometheus_samples( sample, - ingester.domain_name, + ingestor.domain_name, )); } else { log::warn!( - "Failed to fetch metrics from ingester: {}\n", - ingester.domain_name, + "Failed to fetch metrics from ingestor: {}\n", + ingestor.domain_name, ); } } @@ -346,27 +346,27 @@ pub async fn get_cluster_metrics() -> Result { Ok(actix_web::HttpResponse::Ok().json(dresses)) } -// update the .query.json file and return the new IngesterMetadataArr -pub async fn get_ingester_info() -> anyhow::Result { +// update the .query.json file and return the new ingestorMetadataArr +pub async fn get_ingestor_info() -> anyhow::Result { let store = CONFIG.storage().get_object_store(); let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); let arr = store .get_objects( Some(&root_path), - Box::new(|file_name| file_name.starts_with("ingester")), + Box::new(|file_name| file_name.starts_with("ingestor")), ) .await? .iter() // this unwrap will most definateley shoot me in the foot later - .map(|x| serde_json::from_slice::(x).unwrap_or_default()) + .map(|x| serde_json::from_slice::(x).unwrap_or_default()) .collect_vec(); Ok(arr) } -pub async fn remove_ingester(req: HttpRequest) -> Result { - let domain_name: String = req.match_info().get("ingester").unwrap().parse().unwrap(); +pub async fn remove_ingestor(req: HttpRequest) -> Result { + let domain_name: String = req.match_info().get("ingestor").unwrap().parse().unwrap(); let domain_name = to_url_string(domain_name); if check_liveness(&domain_name).await { @@ -374,14 +374,14 @@ pub async fn remove_ingester(req: HttpRequest) -> Result { diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 761177d45..579ffa99c 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -16,7 +16,7 @@ * */ -use crate::handlers::http::{logstream::error::StreamError, modal::IngesterMetadata}; +use crate::handlers::http::{logstream::error::StreamError, modal::IngestorMetadata}; use actix_web::http::header; use chrono::{DateTime, Utc}; use http::StatusCode; @@ -55,8 +55,8 @@ pub struct ClusterInfo { reachable: bool, staging_path: String, storage_path: String, - error: Option, // error message if the ingester is not reachable - status: Option, // status message if the ingester is reachable + error: Option, // error message if the ingestor is not reachable + status: Option, // status message if the ingestor is reachable } impl ClusterInfo { @@ -197,14 +197,14 @@ pub async fn check_liveness(domain_name: &str) -> bool { reqw.is_ok() } -/// send a request to the ingester to fetch its stats +/// send a request to the ingestor to fetch its stats /// dead for now #[allow(dead_code)] pub async fn send_stats_request( url: &str, - ingester: IngesterMetadata, + ingestor: IngestorMetadata, ) -> Result, StreamError> { - if !check_liveness(&ingester.domain_name).await { + if !check_liveness(&ingestor.domain_name).await { return Ok(None); } @@ -212,13 +212,13 @@ pub async fn send_stats_request( let res = client .get(url) .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingester.token) + .header(header::AUTHORIZATION, ingestor.token) .send() .await .map_err(|err| { log::error!( - "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, + "Fatal: failed to fetch stats from ingestor: {}\n Error: {:?}", + ingestor.domain_name, err ); @@ -227,14 +227,14 @@ pub async fn send_stats_request( if !res.status().is_success() { log::error!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name, + "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, res ); return Err(StreamError::Custom { msg: format!( - "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}", - ingester.domain_name, + "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, res.text().await.unwrap_or_default() ), status: StatusCode::INTERNAL_SERVER_ERROR, @@ -247,16 +247,16 @@ pub async fn send_stats_request( /// domain_name needs to be http://ip:port /// dead code for now #[allow(dead_code)] -pub fn ingester_meta_filename(domain_name: &str) -> String { +pub fn ingestor_meta_filename(domain_name: &str) -> String { if domain_name.starts_with("http://") | domain_name.starts_with("https://") { let url = Url::parse(domain_name).unwrap(); return format!( - "ingester.{}.{}.json", + "ingestor.{}.{}.json", url.host_str().unwrap(), url.port().unwrap() ); } - format!("ingester.{}.json", domain_name) + format!("ingestor.{}.json", domain_name) } pub fn to_url_string(str: String) -> String { diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 1fa728d81..827e0729e 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -26,7 +26,7 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; -use super::cluster::fetch_stats_from_ingesters; +use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; @@ -286,7 +286,26 @@ pub async fn put_enable_cache( if CONFIG.parseable.local_cache_path.is_none() { return Err(StreamError::CacheNotEnabled(stream_name)); } - + if CONFIG.parseable.mode == Mode::Ingest { + // here the ingest server has not found the stream + // so it should check if the stream exists in storage + let streams = storage.list_streams().await?; + if !streams.contains(&LogStream { + name: stream_name.clone().to_owned(), + }) { + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + metadata::STREAM_INFO + .upsert_stream_info( + &*storage, + LogStream { + name: stream_name.clone().to_owned(), + }, + ) + .await + .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + } let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; stream_metadata.cache_enabled = enable_cache; storage @@ -310,8 +329,8 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingester_stats = if CONFIG.parseable.mode == Mode::Query { - Some(fetch_stats_from_ingesters(&stream_name).await?) + let ingestor_stats = if CONFIG.parseable.mode == Mode::Query { + Some(fetch_stats_from_ingestors(&stream_name).await?) } else { None }; @@ -348,9 +367,9 @@ pub async fn get_stats(req: HttpRequest) -> Result QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) } }; - let stats = if let Some(mut ingester_stats) = ingester_stats { - ingester_stats.push(stats); - merge_quried_stats(ingester_stats) + let stats = if let Some(mut ingestor_stats) = ingestor_stats { + ingestor_stats.push(stats); + merge_quried_stats(ingestor_stats) } else { stats }; diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 3fb54e987..06188f5ff 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -20,21 +20,20 @@ use crate::analytics; use crate::banner; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; -use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; use crate::rbac; use crate::rbac::role::Action; use crate::storage; -use crate::storage::object_storage::ingester_metadata_path; +use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; use crate::storage::ObjectStorageError; use crate::sync; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; -use super::IngesterMetadata; +use super::IngestorMetadata; use super::OpenIdClient; use super::ParseableServer; use super::DEFAULT_VERSION; @@ -66,8 +65,8 @@ impl ParseableServer for IngestServer { prometheus: PrometheusMetrics, _oidc_client: Option, ) -> anyhow::Result<()> { - // set the ingester metadata - self.set_ingester_metadata().await?; + // set the ingestor metadata + self.set_ingestor_metadata().await?; // get the ssl stuff let ssl = get_ssl_acceptor( @@ -136,7 +135,6 @@ impl IngestServer { .service( // Base path "{url}/api/v1" web::scope(&base_path()) - .service(Server::get_query_factory()) .service(Server::get_ingest_factory()) .service(Self::logstream_api()) .service(Server::get_about_factory()) @@ -158,72 +156,42 @@ impl IngestServer { } fn logstream_api() -> Scope { - web::scope("/logstream") - .service( - // GET "/logstream" ==> Get list of all Log Streams on the server - web::resource("") - .route(web::get().to(logstream::list).authorize(Action::ListStream)), - ) - .service( - web::scope("/{logstream}") - .service( - web::resource("") - // PUT "/logstream/{logstream}" ==> Create log stream - .route( - web::put() - .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), - ) - // DELETE "/logstream/{logstream}" ==> Delete log stream - .route( - web::delete() - .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ) - .service( - web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ), + web::scope("/logstream").service( + web::scope("/{logstream}") + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), ), - ) + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ), + ), + ) } - // create the ingester metadata and put the .ingester.json file in the object store - async fn set_ingester_metadata(&self) -> anyhow::Result<()> { + // create the ingestor metadata and put the .ingestor.json file in the object store + async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); let sock = get_address(); - let path = ingester_metadata_path(sock.ip().to_string(), sock.port().to_string()); + let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string()); if store.get_object(&path).await.is_ok() { - println!("Ingester metadata already exists"); + println!("ingestor metadata already exists"); return Ok(()); }; let scheme = CONFIG.parseable.get_scheme(); - let resource = IngesterMetadata::new( + let resource = IngestorMetadata::new( sock.port().to_string(), CONFIG .parseable @@ -250,7 +218,7 @@ impl IngestServer { } // check for querier state. Is it there, or was it there in the past - // this should happen before the set the ingester metadata + // this should happen before the set the ingestor metadata async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> { // how do we check for querier state? // based on the work flow of the system, the querier will always need to start first @@ -272,19 +240,19 @@ impl IngestServer { // check if your creds match with others let store = CONFIG.storage().get_object_store(); let base_path = RelativePathBuf::from(""); - let ingester_metadata = store + let ingestor_metadata = store .get_objects( Some(&base_path), - Box::new(|file_name| file_name.starts_with("ingester")), + Box::new(|file_name| file_name.starts_with("ingestor")), ) .await? .iter() // this unwrap will most definateley shoot me in the foot later - .map(|x| serde_json::from_slice::(x).unwrap_or_default()) + .map(|x| serde_json::from_slice::(x).unwrap_or_default()) .collect_vec(); - if !ingester_metadata.is_empty() { - let check = ingester_metadata[0].token.clone(); + if !ingestor_metadata.is_empty() { + let check = ingestor_metadata[0].token.clone(); let token = base64::prelude::BASE64_STANDARD.encode(format!( "{}:{}", @@ -294,8 +262,8 @@ impl IngestServer { let token = format!("Basic {}", token); if check != token { - log::error!("Credentials do not match with other ingesters. Please check your credentials and try again."); - return Err(anyhow::anyhow!("Credentials do not match with other ingesters. Please check your credentials and try again.")); + log::error!("Credentials do not match with other ingestors. Please check your credentials and try again."); + return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again.")); } } diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index 5881b3bfb..29ef214ff 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -55,7 +55,7 @@ pub trait ParseableServer { } #[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq)] -pub struct IngesterMetadata { +pub struct IngestorMetadata { pub version: String, pub port: String, pub domain_name: String, @@ -63,7 +63,7 @@ pub struct IngesterMetadata { pub token: String, } -impl IngesterMetadata { +impl IngestorMetadata { pub fn new( port: String, domain_name: String, @@ -91,11 +91,11 @@ mod test { use actix_web::body::MessageBody; use rstest::rstest; - use super::{IngesterMetadata, DEFAULT_VERSION}; + use super::{IngestorMetadata, DEFAULT_VERSION}; #[rstest] fn test_deserialize_resource() { - let lhs: IngesterMetadata = IngesterMetadata::new( + let lhs: IngestorMetadata = IngestorMetadata::new( "8000".to_string(), "https://localhost:8000".to_string(), DEFAULT_VERSION.to_string(), @@ -104,14 +104,14 @@ mod test { "admin", ); - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#).unwrap(); + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#).unwrap(); assert_eq!(rhs, lhs); } #[rstest] fn test_serialize_resource() { - let im = IngesterMetadata::new( + let im = IngestorMetadata::new( "8000".to_string(), "https://localhost:8000".to_string(), DEFAULT_VERSION.to_string(), diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index d3a67b1a1..8611e9910 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -148,13 +148,13 @@ impl QueryServer { .authorize(Action::ListClusterMetrics), ), ) - // DELETE "/cluster/{ingester_domain:port}" ==> Delete an ingester from the cluster + // DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster .service( - web::scope("/{ingester}").service( + web::scope("/{ingestor}").service( web::resource("").route( web::delete() - .to(cluster::remove_ingester) - .authorize(Action::DeleteIngester), + .to(cluster::remove_ingestor) + .authorize(Action::Deleteingestor), ), ), ) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index df9cd0afc..ceb03b14f 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -267,6 +267,15 @@ impl Server { .authorize_for_stream(Action::GetRetention), ), ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ), + ) .service( web::resource("/cache") // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 456b6ab3f..39d22f2a0 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -215,7 +215,7 @@ async fn into_query( /// unused for now, might need it in the future #[allow(unused)] -fn transform_query_for_ingester(query: &Query) -> Option { +fn transform_query_for_ingestor(query: &Query) -> Option { if query.query.is_empty() { return None; } @@ -237,7 +237,7 @@ fn transform_query_for_ingester(query: &Query) -> Option { }; let start_time = end_time - chrono::Duration::minutes(1); - // when transforming the query, the ingesters are forced to return an array of values + // when transforming the query, the ingestors are forced to return an array of values let q = Query { query: query.query.clone(), fields: false, diff --git a/server/src/migration.rs b/server/src/migration.rs index 1207bc30a..3b8bd6db2 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -235,7 +235,7 @@ async fn run_meta_file_migration( log::info!("Migrating metadata files to new location"); // get the list of all meta files - let mut meta_files = object_store.get_ingester_meta_file_paths().await?; + let mut meta_files = object_store.get_ingestor_meta_file_paths().await?; meta_files.push(old_meta_file_path); for file in meta_files { diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index a8c29c3b6..2577593c4 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -328,7 +328,7 @@ impl TableProvider for StandardTableProvider { let obs = glob_storage .get_objects( Some(&path), - Box::new(|file_name| file_name.starts_with(".ingester")), + Box::new(|file_name| file_name.starts_with(".ingestor")), ) .await; diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 6636c925c..ee17bea5c 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -47,7 +47,7 @@ pub enum Action { QueryLLM, ListCluster, ListClusterMetrics, - DeleteIngester, + Deleteingestor, All, GetAnalytics, } @@ -106,7 +106,7 @@ impl RoleBuilder { | Action::ListStream | Action::ListCluster | Action::ListClusterMetrics - | Action::DeleteIngester + | Action::Deleteingestor | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema @@ -138,7 +138,7 @@ pub mod model { Admin, Editor, Writer { stream: String }, - Ingester { stream: String }, + Ingestor { stream: String }, Reader { stream: String, tag: Option }, } @@ -157,7 +157,7 @@ pub mod model { } reader } - DefaultPrivilege::Ingester { stream } => { + DefaultPrivilege::Ingestor { stream } => { ingest_perm_builder().with_stream(stream.to_owned()) } } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index e564aaa65..f480fa33a 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -114,7 +114,7 @@ impl ObjectStorage for LocalFS { res } - async fn get_ingester_meta_file_paths( + async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError> { let time = Instant::now(); @@ -129,7 +129,7 @@ impl ObjectStorage for LocalFS { .unwrap_or_default() .to_str() .unwrap_or_default() - .contains("ingester"); + .contains("ingestor"); if flag { path_arr.push( @@ -165,7 +165,7 @@ impl ObjectStorage for LocalFS { .unwrap_or_default() .to_str() .unwrap_or_default() - .contains("ingester"); + .contains("ingestor"); if flag { path_arr.push(RelativePathBuf::from_iter([ @@ -213,9 +213,9 @@ impl ObjectStorage for LocalFS { .to_str() .unwrap() .to_owned(); - let ingester_file = filter_func(path); + let ingestor_file = filter_func(path); - if !ingester_file { + if !ingestor_file { continue; } @@ -278,11 +278,11 @@ impl ObjectStorage for LocalFS { Ok(fs::remove_dir_all(path).await?) } - async fn try_delete_ingester_meta( + async fn try_delete_ingestor_meta( &self, - ingester_filename: String, + ingestor_filename: String, ) -> Result<(), ObjectStorageError> { - let path = self.root.join(ingester_filename); + let path = self.root.join(ingestor_filename); Ok(fs::remove_file(path).await?) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index b61ccd619..a49a851d3 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -85,16 +85,16 @@ pub trait ObjectStorage: Sync + 'static { async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>; - async fn get_ingester_meta_file_paths( + async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError>; async fn get_stream_file_paths( &self, stream_name: &str, ) -> Result, ObjectStorageError>; - async fn try_delete_ingester_meta( + async fn try_delete_ingestor_meta( &self, - ingester_filename: String, + ingestor_filename: String, ) -> Result<(), ObjectStorageError>; /// Returns the amount of time taken by the `ObjectStore` to perform a get /// call. @@ -541,7 +541,7 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { Mode::Ingest => { let addr = get_address(); let file_name = format!( - ".ingester.{}.{}{}", + ".ingestor.{}.{}{}", addr.ip(), addr.port(), SCHEMA_FILE_NAME @@ -561,7 +561,7 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { Mode::Ingest => { let addr = get_address(); let file_name = format!( - ".ingester.{}.{}{}", + ".ingestor.{}.{}{}", addr.ip(), addr.port(), STREAM_METADATA_FILE_NAME @@ -595,9 +595,9 @@ fn manifest_path(prefix: &str) -> RelativePathBuf { } #[inline(always)] -pub fn ingester_metadata_path(ip: String, port: String) -> RelativePathBuf { +pub fn ingestor_metadata_path(ip: String, port: String) -> RelativePathBuf { RelativePathBuf::from_iter([ PARSEABLE_ROOT_DIRECTORY, - &format!("ingester.{}.{}.json", ip, port), + &format!("ingestor.{}.{}.json", ip, port), ]) } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index e3a5f81fb..ae48a1d51 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -430,9 +430,9 @@ impl ObjectStorage for S3 { let mut res = vec![]; while let Some(meta) = list_stream.next().await.transpose()? { - let ingester_file = filter_func(meta.location.filename().unwrap().to_string()); + let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); - if !ingester_file { + if !ingestor_file { continue; } @@ -454,7 +454,7 @@ impl ObjectStorage for S3 { Ok(res) } - async fn get_ingester_meta_file_paths( + async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError> { let time = Instant::now(); @@ -462,7 +462,7 @@ impl ObjectStorage for S3 { let mut object_stream = self.client.list(Some(&self.root)).await?; while let Some(meta) = object_stream.next().await.transpose()? { - let flag = meta.location.filename().unwrap().starts_with("ingester"); + let flag = meta.location.filename().unwrap().starts_with("ingestor"); if flag { path_arr.push(RelativePathBuf::from(meta.location.as_ref())); @@ -487,7 +487,7 @@ impl ObjectStorage for S3 { let mut object_stream = self.client.list(Some(&path)).await?; while let Some(meta) = object_stream.next().await.transpose()? { - let flag = meta.location.filename().unwrap().starts_with(".ingester"); + let flag = meta.location.filename().unwrap().starts_with(".ingestor"); if flag { path_arr.push(RelativePathBuf::from(meta.location.as_ref())); @@ -544,11 +544,11 @@ impl ObjectStorage for S3 { Ok(()) } - async fn try_delete_ingester_meta( + async fn try_delete_ingestor_meta( &self, - ingester_filename: String, + ingestor_filename: String, ) -> Result<(), ObjectStorageError> { - let file = RelativePathBuf::from(&ingester_filename); + let file = RelativePathBuf::from(&ingestor_filename); match self.client.delete(&to_object_store_path(&file)).await { Ok(_) => Ok(()), Err(err) => { @@ -558,7 +558,7 @@ impl ObjectStorage for S3 { log::error!("Node does not exist"); Err(err.into()) } else { - log::error!("Error deleting ingester meta file: {:?}", err); + log::error!("Error deleting ingestor meta file: {:?}", err); Err(err.into()) } }