diff --git a/server/src/about.rs b/server/src/about.rs index f33ac2137..9aea9ff28 100644 --- a/server/src/about.rs +++ b/server/src/about.rs @@ -90,10 +90,10 @@ pub fn print_about( eprint!( " {} - Version: \"v{}\"", + Version:\t\t\t\t\t\"v{}\"", "About:".to_string().bold(), current_version, - ); + ); // " " " " if let Some(latest_release) = latest_release { if latest_release.version > current_version { @@ -103,8 +103,8 @@ pub fn print_about( eprintln!( " - Commit: \"{commit_hash}\" - Docs: \"https://logg.ing/docs\"" + Commit:\t\t\t\t\t\t\"{commit_hash}\" + Docs:\t\t\t\t\t\t\"https://logg.ing/docs\"" ); } diff --git a/server/src/analytics.rs b/server/src/analytics.rs index ca8d172d6..e10311469 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -90,7 +90,7 @@ impl Report { cpu_count, memory_total_bytes: mem_total, platform: platform().to_string(), - mode: CONFIG.mode_string().to_string(), + mode: CONFIG.get_storage_mode_string().to_string(), version: current().released_version.to_string(), commit_hash: current().commit_hash, metrics: build_metrics(), diff --git a/server/src/banner.rs b/server/src/banner.rs index 0f1dc5120..d9f3cc609 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -35,13 +35,13 @@ pub async fn print(config: &Config, meta: &StorageMetadata) { fn print_ascii_art() { let ascii_name = r#" - `7MM"""Mq. *MM `7MM - MM `MM. MM MM - MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya - MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb - MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M"""""" - MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. , - .JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd' + `7MM"""Mq. *MM `7MM + MM `MM. MM MM + MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya + MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb + MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M"""""" + MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. , + .JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd' "#; eprint!("{ascii_name}"); @@ -77,12 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { eprintln!( " {} - Address: {} - Credentials: {} - LLM Status: \"{}\"", + Address:\t\t\t\t\t{} + Credentials:\t\t\t\t\t{} + Server Mode:\t\t\t\t\t\"{}\" + LLM Status:\t\t\t\t\t\"{}\"", "Server:".to_string().bold(), address, credentials, + config.parseable.mode.to_str(), llm_status ); } @@ -99,10 +101,10 @@ async fn storage_info(config: &Config) { eprintln!( " {} - Mode: \"{}\" - Staging: \"{}\"", + Storage Mode:\t\t\t\t\t\"{}\" + Staging Path:\t\t\t\t\t\"{}\"", "Storage:".to_string().bold(), - config.mode_string(), + config.get_storage_mode_string(), config.staging_dir().to_string_lossy(), ); @@ -114,7 +116,7 @@ async fn storage_info(config: &Config) { eprintln!( "\ - {:8}Cache: \"{}\", (size: {})", + {:8}Cache:\t\t\t\t\t\"{}\", (size: {})", "", path.display(), size @@ -123,7 +125,7 @@ async fn storage_info(config: &Config) { eprintln!( "\ - {:8}Store: \"{}\", (latency: {:?})", + {:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})", "", storage.get_endpoint(), latency diff --git a/server/src/catalog.rs b/server/src/catalog.rs index f8adad1ca..cba5ddfb9 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -24,7 +24,8 @@ use relative_path::RelativePathBuf; use crate::{ catalog::manifest::Manifest, query::PartialTimeFilter, - storage::{ObjectStorage, ObjectStorageError}, + storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE}, + utils::get_address, }; use self::{column::Column, snapshot::ManifestItem}; @@ -105,20 +106,67 @@ pub async fn update_snapshot( item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound }); + // if the mode in I.S. manifest needs to be created but it is not getting created because + // there is already a pos, to index into stream.json + // We update the manifest referenced by this position // This updates an existing file so there is no need to create a snapshot entry. if let Some(pos) = pos { let info = &mut manifests[pos]; let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - let Some(mut manifest) = storage.get_manifest(&path).await? else { - return Err(ObjectStorageError::UnhandledError( - "Manifest found in snapshot but not in object-storage" - .to_string() - .into(), - )); - }; - manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; + + let mut ch = false; + for m in manifests.iter() { + let s = get_address(); + let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE); + if m.manifest_path.contains(&p) { + ch = true; + } + } + if ch { + let Some(mut manifest) = storage.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); + }; + manifest.apply_change(change); + storage.put_manifest(&path, manifest).await?; + } else { + let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let upper_bound = lower_bound + .date_naive() + .and_time( + NaiveTime::from_num_seconds_from_midnight_opt( + 23 * 3600 + 59 * 60 + 59, + 999_999_999, + ) + .unwrap(), + ) + .and_utc(); + + let manifest = Manifest { + files: vec![change], + ..Manifest::default() + }; + + let addr = get_address(); + let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); + let path = + partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); + storage + .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) + .await?; + let path = storage.absolute_url(&path); + let new_snapshot_entriy = snapshot::ManifestItem { + manifest_path: path.to_string(), + time_lower_bound: lower_bound, + time_upper_bound: upper_bound, + }; + manifests.push(new_snapshot_entriy); + storage.put_snapshot(stream_name, meta).await?; + } } else { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound @@ -137,7 +185,9 @@ pub async fn update_snapshot( ..Manifest::default() }; - let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json"); + let addr = get_address(); + let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); + let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) .await?; diff --git a/server/src/cli.rs b/server/src/cli.rs new file mode 100644 index 000000000..691547f86 --- /dev/null +++ b/server/src/cli.rs @@ -0,0 +1,451 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches}; +use std::path::PathBuf; + +use url::Url; + +use crate::{ + oidc::{self, OpenidConfig}, + option::{validation, Compression, Mode}, +}; + +#[derive(Debug, Default)] +pub struct Cli { + /// The location of TLS Cert file + pub tls_cert_path: Option, + + /// The location of TLS Private Key file + pub tls_key_path: Option, + + /// The address on which the http server will listen. + pub address: String, + + /// Base domain under which server is hosted. + /// This information is used by OIDC to refer redirects + pub domain_address: Option, + + /// The local staging path is used as a temporary landing point + /// for incoming events and local cache + pub local_staging_path: PathBuf, + + /// The local cache path is used for speeding up query on latest data + pub local_cache_path: Option, + + /// Size for local cache + pub local_cache_size: u64, + + /// Username for the basic authentication on the server + pub username: String, + + /// Password for the basic authentication on the server + pub password: String, + + /// OpenId configuration + pub openid: Option, + + /// Server should check for update or not + pub check_update: bool, + + /// Server should send anonymous analytics or not + pub send_analytics: bool, + + /// Open AI access key + pub open_ai_key: Option, + + /// Livetail port + pub grpc_port: u16, + + /// Livetail channel capacity + pub livetail_channel_capacity: usize, + + /// Rows in Parquet Rowgroup + pub row_group_size: usize, + + /// Query memory limit in bytes + pub query_memory_pool_size: Option, + + /// Parquet compression algorithm + pub parquet_compression: Compression, + + /// Mode of operation + pub mode: Mode, +} + +impl Cli { + // identifiers for arguments + pub const TLS_CERT: &'static str = "tls-cert-path"; + pub const TLS_KEY: &'static str = "tls-key-path"; + pub const ADDRESS: &'static str = "address"; + pub const DOMAIN_URI: &'static str = "origin"; + pub const STAGING: &'static str = "local-staging-path"; + pub const CACHE: &'static str = "cache-path"; + pub const CACHE_SIZE: &'static str = "cache-size"; + pub const USERNAME: &'static str = "username"; + pub const PASSWORD: &'static str = "password"; + pub const CHECK_UPDATE: &'static str = "check-update"; + pub const SEND_ANALYTICS: &'static str = "send-analytics"; + pub const OPEN_AI_KEY: &'static str = "open-ai-key"; + pub const OPENID_CLIENT_ID: &'static str = "oidc-client"; + pub const OPENID_CLIENT_SECRET: &'static str = "oidc-client-secret"; + pub const OPENID_ISSUER: &'static str = "oidc-issuer"; + pub const GRPC_PORT: &'static str = "grpc-port"; + pub const LIVETAIL_CAPACITY: &'static str = "livetail-capacity"; + // todo : what should this flag be + pub const QUERY_MEM_POOL_SIZE: &'static str = "query-mempool-size"; + pub const ROW_GROUP_SIZE: &'static str = "row-group-size"; + pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; + pub const MODE: &'static str = "mode"; + pub const DEFAULT_USERNAME: &'static str = "admin"; + pub const DEFAULT_PASSWORD: &'static str = "admin"; + + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { + self.local_staging_path.join(stream_name) + } + + pub fn get_scheme(&self) -> String { + if self.tls_cert_path.is_some() && self.tls_key_path.is_some() { + return "https".to_string(); + } + "http".to_string() + } + + pub fn create_cli_command_with_clap(name: &'static str) -> Command { + Command::new(name).next_line_help(false) + .arg( + Arg::new(Self::TLS_CERT) + .long(Self::TLS_CERT) + .env("P_TLS_CERT_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where certificate file is located. Required to enable TLS"), + ) + .arg( + Arg::new(Self::TLS_KEY) + .long(Self::TLS_KEY) + .env("P_TLS_KEY_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where private key file is located. Required to enable TLS"), + ) + .arg( + Arg::new(Self::ADDRESS) + .long(Self::ADDRESS) + .env("P_ADDR") + .value_name("ADDR:PORT") + .default_value("0.0.0.0:8000") + .value_parser(validation::socket_addr) + .help("Address and port for Parseable HTTP(s) server"), + ) + .arg( + Arg::new(Self::STAGING) + .long(Self::STAGING) + .env("P_STAGING_DIR") + .value_name("DIR") + .default_value("./staging") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used as landing point for incoming events") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE) + .long(Self::CACHE) + .env("P_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE_SIZE) + .long(Self::CACHE_SIZE) + .env("P_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) + + .arg( + Arg::new(Self::USERNAME) + .long(Self::USERNAME) + .env("P_USERNAME") + .value_name("STRING") + .required(true) + .help("Admin username to be set for this Parseable server"), + ) + .arg( + Arg::new(Self::PASSWORD) + .long(Self::PASSWORD) + .env("P_PASSWORD") + .value_name("STRING") + .required(true) + .help("Admin password to be set for this Parseable server"), + ) + .arg( + Arg::new(Self::CHECK_UPDATE) + .long(Self::CHECK_UPDATE) + .env("P_CHECK_UPDATE") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable checking for new Parseable release"), + ) + .arg( + Arg::new(Self::SEND_ANALYTICS) + .long(Self::SEND_ANALYTICS) + .env("P_SEND_ANONYMOUS_USAGE_DATA") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable anonymous telemetry data collection"), + ) + .arg( + Arg::new(Self::OPEN_AI_KEY) + .long(Self::OPEN_AI_KEY) + .env("P_OPENAI_API_KEY") + .value_name("STRING") + .required(false) + .help("OpenAI key to enable llm features"), + ) + .arg( + Arg::new(Self::OPENID_CLIENT_ID) + .long(Self::OPENID_CLIENT_ID) + .env("P_OIDC_CLIENT_ID") + .value_name("STRING") + .required(false) + .help("Client id for OIDC provider"), + ) + .arg( + Arg::new(Self::OPENID_CLIENT_SECRET) + .long(Self::OPENID_CLIENT_SECRET) + .env("P_OIDC_CLIENT_SECRET") + .value_name("STRING") + .required(false) + .help("Client secret for OIDC provider"), + ) + .arg( + Arg::new(Self::OPENID_ISSUER) + .long(Self::OPENID_ISSUER) + .env("P_OIDC_ISSUER") + .value_name("URl") + .required(false) + .value_parser(validation::url) + .help("OIDC provider's host address"), + ) + .arg( + Arg::new(Self::DOMAIN_URI) + .long(Self::DOMAIN_URI) + .env("P_ORIGIN_URI") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("Parseable server global domain address"), + ) + .arg( + Arg::new(Self::GRPC_PORT) + .long(Self::GRPC_PORT) + .env("P_GRPC_PORT") + .value_name("PORT") + .default_value("8001") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for gRPC server"), + ) + .arg( + Arg::new(Self::LIVETAIL_CAPACITY) + .long(Self::LIVETAIL_CAPACITY) + .env("P_LIVETAIL_CAPACITY") + .value_name("NUMBER") + .default_value("1000") + .required(false) + .value_parser(value_parser!(usize)) + .help("Number of rows in livetail channel"), + ) + .arg( + Arg::new(Self::QUERY_MEM_POOL_SIZE) + .long(Self::QUERY_MEM_POOL_SIZE) + .env("P_QUERY_MEMORY_LIMIT") + .value_name("Gib") + .required(false) + .value_parser(value_parser!(u8)) + .help("Set a fixed memory limit for query"), + ) + .arg( + Arg::new(Self::ROW_GROUP_SIZE) + .long(Self::ROW_GROUP_SIZE) + .env("P_PARQUET_ROW_GROUP_SIZE") + .value_name("NUMBER") + .required(false) + .default_value("16384") + .value_parser(value_parser!(usize)) + .help("Number of rows in a row group"), + ).arg( + Arg::new(Self::MODE) + .long(Self::MODE) + .env("P_MODE") + .value_name("STRING") + .required(false) + .default_value("all") + .value_parser([ + "query", + "ingest", + "all"]) + .help("Mode of operation"), + ) + .arg( + Arg::new(Self::PARQUET_COMPRESSION_ALGO) + .long(Self::PARQUET_COMPRESSION_ALGO) + .env("P_PARQUET_COMPRESSION_ALGO") + .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") + .required(false) + .default_value("lz4") + .value_parser([ + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "zstd"]) + .help("Parquet compression algorithm"), + ).group( + ArgGroup::new("oidc") + .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .multiple(true) + ) + } +} + +impl FromArgMatches for Cli { + fn from_arg_matches(m: &clap::ArgMatches) -> Result { + let mut s: Self = Self::default(); + s.update_from_arg_matches(m)?; + Ok(s) + } + + fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { + self.local_cache_path = m.get_one::(Self::CACHE).cloned(); + self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); + self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); + self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); + + self.address = m + .get_one::(Self::ADDRESS) + .cloned() + .expect("default value for address"); + self.local_staging_path = m + .get_one::(Self::STAGING) + .cloned() + .expect("default value for staging"); + self.local_cache_size = m + .get_one::(Self::CACHE_SIZE) + .cloned() + .expect("default value for cache size"); + self.username = m + .get_one::(Self::USERNAME) + .cloned() + .expect("default for username"); + self.password = m + .get_one::(Self::PASSWORD) + .cloned() + .expect("default for password"); + self.check_update = m + .get_one::(Self::CHECK_UPDATE) + .cloned() + .expect("default for check update"); + self.send_analytics = m + .get_one::(Self::SEND_ANALYTICS) + .cloned() + .expect("default for send analytics"); + self.open_ai_key = m.get_one::(Self::OPEN_AI_KEY).cloned(); + self.grpc_port = m + .get_one::(Self::GRPC_PORT) + .cloned() + .expect("default for livetail port"); + self.livetail_channel_capacity = m + .get_one::(Self::LIVETAIL_CAPACITY) + .cloned() + .expect("default for livetail capacity"); + // converts Gib to bytes before assigning + self.query_memory_pool_size = m + .get_one::(Self::QUERY_MEM_POOL_SIZE) + .cloned() + .map(|gib| gib as usize * 1024usize.pow(3)); + self.row_group_size = m + .get_one::(Self::ROW_GROUP_SIZE) + .cloned() + .expect("default for row_group size"); + self.parquet_compression = match m + .get_one::(Self::PARQUET_COMPRESSION_ALGO) + .expect("default for compression algo") + .as_str() + { + "uncompressed" => Compression::UNCOMPRESSED, + "snappy" => Compression::SNAPPY, + "gzip" => Compression::GZIP, + "lzo" => Compression::LZO, + "brotli" => Compression::BROTLI, + "lz4" => Compression::LZ4, + "zstd" => Compression::ZSTD, + _ => unreachable!(), + }; + + let openid_client_id = m.get_one::(Self::OPENID_CLIENT_ID).cloned(); + let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); + let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); + + self.openid = match (openid_client_id, openid_client_secret, openid_issuer) { + (Some(id), Some(secret), Some(issuer)) => { + let origin = if let Some(url) = self.domain_address.clone() { + oidc::Origin::Production(url) + } else { + oidc::Origin::Local { + socket_addr: self.address.clone(), + https: self.tls_cert_path.is_some() && self.tls_key_path.is_some(), + } + }; + Some(OpenidConfig { + id, + secret, + issuer, + origin, + }) + } + _ => None, + }; + + self.mode = match m + .get_one::(Self::MODE) + .expect("Mode not set") + .as_str() + { + "query" => Mode::Query, + "ingest" => Mode::Ingest, + "all" => Mode::All, + _ => unreachable!(), + }; + + Ok(()) + } +} diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index e30e3d77a..ea69b0b59 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -16,341 +16,30 @@ * */ -use std::fs::File; -use std::io::BufReader; -use std::sync::Arc; - use actix_cors::Cors; -use actix_web::{ - web::{self, resource}, - App, HttpServer, -}; -use actix_web_prometheus::PrometheusMetrics; -use actix_web_static_files::ResourceFiles; -use log::info; -use openid::Discovered; -use rustls::{Certificate, PrivateKey, ServerConfig}; -use rustls_pemfile::{certs, pkcs8_private_keys}; - -use crate::option::CONFIG; -use crate::rbac::role::Action; -use self::middleware::{DisAllowRootUser, ModeFilter, RouteExt}; - -mod about; -mod health_check; -mod ingest; +pub(crate) mod about; +pub(crate) mod health_check; +pub(crate) mod ingest; mod kinesis; -mod llm; -mod logstream; -mod middleware; -mod oidc; +pub(crate) mod llm; +pub(crate) mod logstream; +pub(crate) mod middleware; +pub(crate) mod modal; +pub(crate) mod oidc; mod otel; -mod query; -mod rbac; -mod role; - -include!(concat!(env!("OUT_DIR"), "/generated.rs")); - -const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; -const API_BASE_PATH: &str = "/api"; -const API_VERSION: &str = "v1"; - -pub async fn run_http( - prometheus: PrometheusMetrics, - oidc_client: Option, -) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - None => None, - }; - - let create_app = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|cfg| configure_routes(cfg, oidc_client.clone())) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - .wrap(ModeFilter) - }; +pub(crate) mod query; +pub(crate) mod rbac; +pub(crate) mod role; - let ssl_acceptor = match ( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - ) { - (Some(cert), Some(key)) => { - // init server config builder with safe defaults - let config = ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth(); +// this needs to be removed from here. It is in modal->mod.rs +// include!(concat!(env!("OUT_DIR"), "/generated.rs")); - // load TLS key/cert files - let cert_file = &mut BufReader::new(File::open(cert)?); - let key_file = &mut BufReader::new(File::open(key)?); - - // convert files to key/cert objects - let cert_chain = certs(cert_file)?.into_iter().map(Certificate).collect(); - - let mut keys: Vec = pkcs8_private_keys(key_file)? - .into_iter() - .map(PrivateKey) - .collect(); - - // exit if no keys could be parsed - if keys.is_empty() { - anyhow::bail!("Could not locate PKCS 8 private keys."); - } - - let server_config = config.with_single_cert(cert_chain, keys.remove(0))?; - - Some(server_config) - } - (_, _) => None, - }; - - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app).workers(num_cpus::get()); - if let Some(config) = ssl_acceptor { - http_server - .bind_rustls(&CONFIG.parseable.address, config)? - .run() - .await?; - } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } - - Ok(()) -} - -pub fn configure_routes( - cfg: &mut web::ServiceConfig, - oidc_client: Option>>, -) { - let generated = generate(); - - //log stream API - let logstream_api = web::scope("/{logstream}") - .service( - web::resource("") - // PUT "/logstream/{logstream}" ==> Create log stream - .route( - web::put() - .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), - ) - // POST "/logstream/{logstream}" ==> Post logs to given log stream - .route( - web::post() - .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), - ) - // DELETE "/logstream/{logstream}" ==> Delete log stream - .route( - web::delete() - .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ) - .service( - web::resource("/retention") - // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), - ) - // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), - ), - ) - .service( - web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) - // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_cache_enabled) - .authorize_for_stream(Action::GetCacheEnabled), - ), - ); - - // User API - let user_api = web::scope("/user") - .service( - web::resource("") - // GET /user => List all users - .route(web::get().to(rbac::list_users).authorize(Action::ListUser)), - ) - .service( - web::resource("/{username}") - // PUT /user/{username} => Create a new user - .route(web::post().to(rbac::post_user).authorize(Action::PutUser)) - // DELETE /user/{username} => Delete a user - .route( - web::delete() - .to(rbac::delete_user) - .authorize(Action::DeleteUser), - ) - .wrap(DisAllowRootUser), - ) - .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user - .route( - web::put() - .to(rbac::put_role) - .authorize(Action::PutUserRoles) - .wrap(DisAllowRootUser), - ) - .route( - web::get() - .to(rbac::get_role) - .authorize_for_user(Action::GetUserRoles), - ), - ) - .service( - web::resource("/{username}/generate-new-password") - // POST /user/{username}/generate-new-password => reset password for this user - .route( - web::post() - .to(rbac::post_gen_password) - .authorize(Action::PutUser) - .wrap(DisAllowRootUser), - ), - ); - - let llm_query_api = web::scope("/llm").service( - web::resource("").route( - web::post() - .to(llm::make_llm_request) - .authorize(Action::QueryLLM), - ), - ); - - let role_api = web::scope("/role") - // GET Role List - .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) - .service( - // PUT and GET Default Role - resource("/default") - .route(web::put().to(role::put_default).authorize(Action::PutRole)) - .route(web::get().to(role::get_default).authorize(Action::GetRole)), - ) - .service( - // PUT, GET, DELETE Roles - resource("/{name}") - .route(web::put().to(role::put).authorize(Action::PutRole)) - .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) - .route(web::get().to(role::get).authorize(Action::GetRole)), - ); - - let mut oauth_api = web::scope("/o") - .service(resource("/login").route(web::get().to(oidc::login))) - .service(resource("/logout").route(web::get().to(oidc::logout))) - .service(resource("/code").route(web::get().to(oidc::reply_login))); - - if let Some(client) = oidc_client { - info!("Registered oidc client"); - oauth_api = oauth_api.app_data(web::Data::from(client)) - } - - // Deny request if username is same as the env variable P_USERNAME. - cfg.service( - // Base path "{url}/api/v1" - web::scope(&base_path()) - // .wrap(PathFilter) - // POST "/query" ==> Get results of the SQL query passed in request body - .service( - web::resource("/query") - .route(web::post().to(query::query).authorize(Action::Query)), - ) - // POST "/ingest" ==> Post logs to given log stream based on header - .service( - web::resource("/ingest") - .route( - web::post() - .to(ingest::ingest) - .authorize_for_stream(Action::Ingest), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command - .service(web::resource("/liveness").route(web::get().to(health_check::liveness))) - // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes - .service(web::resource("/readiness").route(web::get().to(health_check::readiness))) - // GET "/about" ==> Returns information about instance - .service( - web::resource("/about") - .route(web::get().to(about::about).authorize(Action::GetAbout)), - ) - .service( - web::scope("/logstream") - .service( - // GET "/logstream" ==> Get list of all Log Streams on the server - web::resource("") - .route(web::get().to(logstream::list).authorize(Action::ListStream)), - ) - .service( - // logstream API - logstream_api, - ), - ) - .service(user_api) - .service(llm_query_api) - .service(oauth_api) - .service(role_api), - ) - // GET "/" ==> Serve the static frontend directory - .service(ResourceFiles::new("/", generated).resolve_not_found_to_root()); -} +pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; +pub const API_BASE_PATH: &str = "/api"; +pub const API_VERSION: &str = "v1"; -fn base_path() -> String { +pub(crate) fn base_path() -> String { format!("{API_BASE_PATH}/{API_VERSION}") } @@ -358,7 +47,7 @@ pub fn metrics_path() -> String { format!("{}/metrics", base_path()) } -fn cross_origin_config() -> Cors { +pub(crate) fn cross_origin_config() -> Cors { if cfg!(feature = "debug") { Cors::permissive().block_on_origin_mismatch(false) } else { diff --git a/server/src/handlers/http/about.rs b/server/src/handlers/http/about.rs index 3f42ccc4f..7e5d82653 100644 --- a/server/src/handlers/http/about.rs +++ b/server/src/handlers/http/about.rs @@ -40,7 +40,7 @@ pub async fn about() -> Json { let current_version = format!("v{}", current_release.released_version); let commit = current_release.commit_hash; let deployment_id = meta.deployment_id.to_string(); - let mode = CONFIG.mode_string(); + let mode = CONFIG.get_storage_mode_string(); let staging = CONFIG.staging_dir(); let grpc_port = CONFIG.parseable.grpc_port; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index afd057eda..6b8592331 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -120,9 +120,8 @@ pub async fn put_stream(req: HttpRequest) -> Result ), status: StatusCode::BAD_REQUEST, }); - } else { - create_stream(stream_name).await?; } + create_stream(stream_name).await?; Ok(("log stream created", StatusCode::OK)) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs new file mode 100644 index 000000000..cfe1cbf23 --- /dev/null +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -0,0 +1,287 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::analytics; +use crate::banner; +use crate::handlers::http::logstream; +use crate::handlers::http::middleware::RouteExt; +use crate::localcache::LocalCacheManager; +use crate::metadata; +use crate::metrics; +use crate::rbac; +use crate::rbac::role::Action; +use crate::storage; +use crate::storage::ObjectStorageError; +use crate::storage::PARSEABLE_METADATA_FILE_NAME; +use crate::sync; + +use std::net::SocketAddr; + +use super::server::Server; +use super::ssl_acceptor::get_ssl_acceptor; +use super::IngesterMetadata; +use super::OpenIdClient; +use super::ParseableServer; +use super::DEFAULT_VERSION; + +use actix_web::body::MessageBody; +use actix_web::Scope; +use actix_web::{web, App, HttpServer}; +use actix_web_prometheus::PrometheusMetrics; +use async_trait::async_trait; +use relative_path::RelativePathBuf; +use url::Url; + +use crate::{ + handlers::http::{base_path, cross_origin_config}, + option::CONFIG, +}; + +#[derive(Default)] +pub struct IngestServer; + +#[async_trait(?Send)] +impl ParseableServer for IngestServer { + // we dont need oidc client here its just here to satisfy the trait + async fn start( + &self, + prometheus: PrometheusMetrics, + _oidc_client: Option, + ) -> anyhow::Result<()> { + // set the ingestor metadata + self.set_ingestor_metadata().await?; + + // get the ssl stuff + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + )?; + + // fn that creates the app + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| IngestServer::configure_routes(config, None)) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // concurrent workers equal to number of logical cores + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + + if let Some(config) = ssl { + http_server + .bind_rustls(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implement the init method will just invoke the initialize method + async fn init(&self) -> anyhow::Result<()> { + // self.validate()?; + self.initialize().await + } + + #[allow(unused)] + fn validate(&self) -> anyhow::Result<()> { + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::Error::msg( + // Error Message can be better + "Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + )); + } + + Ok(()) + } +} + +impl IngestServer { + // configure the api routes + fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option) { + config + .service( + // Base path "{url}/api/v1" + web::scope(&base_path()).service(Server::get_ingest_factory()), + ) + .service(Server::get_liveness_factory()) + .service(Server::get_readiness_factory()) + .service(Self::get_metrics_webscope()); + } + + fn get_metrics_webscope() -> Scope { + web::scope("/logstream").service( + web::scope("/{logstream}") + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) + .service( + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ), + ) + } + + #[inline(always)] + fn get_ingestor_address(&self) -> SocketAddr { + // this might cause an issue down the line + // best is to make the Cli Struct better, but thats a chore + (CONFIG.parseable.address.clone()) + .parse::() + .unwrap() + } + + // create the ingestor metadata and put the .ingestor.json file in the object store + async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { + let store = CONFIG.storage().get_object_store(); + + // remove ip adn go with the domain name + let sock = self.get_ingestor_address(); + let path = RelativePathBuf::from(format!( + "ingestor.{}.{}.json", + sock.ip(), // this might be wrong + sock.port() + )); + + if store.get_object(&path).await.is_ok() { + println!("Ingestor metadata already exists"); + return Ok(()); + }; + + let resource = IngesterMetadata::new( + sock.port().to_string(), + CONFIG + .parseable + .domain_address + .clone() + .unwrap_or_else(|| { + Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap() + }) + .to_string(), + DEFAULT_VERSION.to_string(), + store.get_bucket_name(), + &CONFIG.parseable.username, + &CONFIG.parseable.password, // is this secure? + ); + + let resource = serde_json::to_string(&resource) + .unwrap() + .try_into_bytes() + .unwrap(); + + store.put_object(&path, resource).await?; + + Ok(()) + } + + // check for querier state. Is it there, or was it there in the past + // this should happen before the set the ingestor metadata + async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> { + // how do we check for querier state? + // based on the work flow of the system, the querier will always need to start first + // i.e the querier will create the `.parseable.json` file + + let store = CONFIG.storage().get_object_store(); + let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + + match store.get_object(&path).await { + Ok(_) => Ok(()), + Err(_) => Err(ObjectStorageError::Custom( + "Query Server has not been started yet. Please start the querier server first." + .to_string(), + )), + } + } + + async fn initialize(&self) -> anyhow::Result<()> { + // check for querier state. Is it there, or was it there in the past + self.check_querier_state().await?; + // to get the .parseable.json file in staging + let meta = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &meta).await; + + rbac::map::init(&meta); + + // set the info in the global metadata + meta.set_global(); + + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prom = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prom); + + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = metadata::STREAM_INFO.load(&*storage).await { + log::warn!("could not populate local metadata. {:?}", err); + } + + metrics::fetch_stats_from_storage().await; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync(); + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler(); + } + let app = self.start(prom, CONFIG.parseable.openid.clone()); + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + localsync_handler.join().unwrap_or(()); + remote_sync_handler.join().unwrap_or(()); + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + remote_sync_handler.join().unwrap_or(()); + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + } + + }; + } + } +} diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs new file mode 100644 index 000000000..5881b3bfb --- /dev/null +++ b/server/src/handlers/http/modal/mod.rs @@ -0,0 +1,133 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod ingest_server; +pub mod query_server; +pub mod server; +pub mod ssl_acceptor; + +use std::sync::Arc; + +use actix_web_prometheus::PrometheusMetrics; +use async_trait::async_trait; +use openid::Discovered; + +use crate::oidc; +use base64::Engine; +use serde::Deserialize; +use serde::Serialize; +pub type OpenIdClient = Arc>; + +// to be decided on what the Default version should be +pub const DEFAULT_VERSION: &str = "v3"; + +include!(concat!(env!("OUT_DIR"), "/generated.rs")); + +#[async_trait(?Send)] +pub trait ParseableServer { + // async fn validate(&self) -> Result<(), ObjectStorageError>; + + /// configure the server + async fn start( + &self, + prometheus: PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()>; + + async fn init(&self) -> anyhow::Result<()>; + + fn validate(&self) -> anyhow::Result<()>; +} + +#[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq)] +pub struct IngesterMetadata { + pub version: String, + pub port: String, + pub domain_name: String, + pub bucket_name: String, + pub token: String, +} + +impl IngesterMetadata { + pub fn new( + port: String, + domain_name: String, + version: String, + bucket_name: String, + username: &str, + password: &str, + ) -> Self { + let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); + + let token = format!("Basic {}", token); + + Self { + port, + domain_name, + version, + bucket_name, + token, + } + } +} + +#[cfg(test)] +mod test { + use actix_web::body::MessageBody; + use rstest::rstest; + + use super::{IngesterMetadata, DEFAULT_VERSION}; + + #[rstest] + fn test_deserialize_resource() { + let lhs: IngesterMetadata = IngesterMetadata::new( + "8000".to_string(), + "https://localhost:8000".to_string(), + DEFAULT_VERSION.to_string(), + "somebucket".to_string(), + "admin", + "admin", + ); + + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#).unwrap(); + + assert_eq!(rhs, lhs); + } + + #[rstest] + fn test_serialize_resource() { + let im = IngesterMetadata::new( + "8000".to_string(), + "https://localhost:8000".to_string(), + DEFAULT_VERSION.to_string(), + "somebucket".to_string(), + "admin", + "admin", + ); + + let lhs = serde_json::to_string(&im) + .unwrap() + .try_into_bytes() + .unwrap(); + let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"# + .try_into_bytes() + .unwrap(); + + assert_eq!(lhs, rhs); + } +} diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs new file mode 100644 index 000000000..acd5c7579 --- /dev/null +++ b/server/src/handlers/http/modal/query_server.rs @@ -0,0 +1,247 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; +use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; +use actix_web::http::header; +use actix_web::web; +use actix_web::web::ServiceConfig; +use actix_web::{App, HttpServer}; +use async_trait::async_trait; +use itertools::Itertools; +use relative_path::RelativePathBuf; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use url::Url; + +use tokio::fs::File as TokioFile; + +use crate::option::CONFIG; + +use super::server::Server; +use super::ssl_acceptor::get_ssl_acceptor; +use super::{IngesterMetadata, OpenIdClient, ParseableServer}; + +type IngesterMetadataArr = Vec; + +#[derive(Default, Debug)] +pub struct QueryServer; + +#[async_trait(?Send)] +impl ParseableServer for QueryServer { + async fn start( + &self, + prometheus: actix_web_prometheus::PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> { + let data = Self::get_ingestor_info().await?; + + // on subsequent runs, the qurier should check if the ingestor is up and running or not + for ingester in data.iter() { + // dbg!(&ingester); + // yes the format macro does not need the '/' ingester.origin already + // has '/' because Url::Parse will add it if it is not present + // uri should be something like `http://address/api/v1/liveness` + let uri = Url::parse(&format!( + "{}{}/liveness", + &ingester.domain_name, + base_path() + ))?; + + if !Self::check_liveness(uri).await { + eprintln!("Ingestor at {} is not reachable", &ingester.domain_name); + } else { + println!("Ingestor at {} is up and running", &ingester.domain_name); + } + } + + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + + None => None, + }; + + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + )?; + + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + if let Some(config) = ssl { + http_server + .bind_rustls(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + // self.validate()?; + self.initialize().await + } + + #[allow(unused)] + fn validate(&self) -> anyhow::Result<()> { + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::anyhow!( + "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + )); + } + + Ok(()) + } +} + +impl QueryServer { + // configure the api routes + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Server::get_query_factory()) + .service(Server::get_liveness_factory()) + .service(Server::get_readiness_factory()) + .service(Server::get_about_factory()) + .service(Server::get_logstream_webscope()) + .service(Server::get_user_webscope()) + .service(Server::get_llm_webscope()) + .service(Server::get_oauth_webscope(oidc_client)) + .service(Server::get_user_role_webscope()), + ) + .service(Server::get_generated()); + } + + // update the .query.json file and return the new IngesterMetadataArr + async fn get_ingestor_info() -> anyhow::Result { + let store = CONFIG.storage().get_object_store(); + + let root_path = RelativePathBuf::from(""); + let arr = store + .get_objects(Some(&root_path)) + .await? + .iter() + // this unwrap will most definateley shoot me in the foot later + .map(|x| serde_json::from_slice::(x).unwrap_or_default()) + .collect_vec(); + + // TODO: add validation logic here + // validate the ingester metadata + + let mut f = Self::get_meta_file().await; + // writer the arr in f + let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?; + Ok(arr) + } + + pub async fn check_liveness(uri: Url) -> bool { + let reqw = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + reqw.is_ok() + } + + /// initialize the server, run migrations as needed and start the server + async fn initialize(&self) -> anyhow::Result<()> { + migration::run_metadata_migration(&CONFIG).await?; + let metadata = storage::resolve_parseable_metadata().await?; + tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?; + banner::print(&CONFIG, &metadata).await; + + // initialize the rbac map + rbac::map::init(&metadata); + + // keep metadata info in mem + metadata.set_global(); + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + // when do we do this + let storage = CONFIG.storage().get_object_store(); + if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { + log::warn!("could not populate local metadata. {:?}", e); + } + + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); + + // load data from stats back to prometheus metrics + metrics::fetch_stats_from_storage().await; + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler(); + } + + // spawn the sync thread + // tokio::spawn(Self::sync_ingestor_metadata()); + + self.start(prometheus, CONFIG.parseable.openid.clone()) + .await?; + + Ok(()) + } + + #[allow(dead_code)] + async fn sync_ingestor_metadata() { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60 / 10)); + loop { + interval.tick().await; + // dbg!("Tick"); + Self::get_ingestor_info().await.unwrap(); + } + } + + async fn get_meta_file() -> TokioFile { + let meta_path = CONFIG.staging_dir().join(".query.json"); + + tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .open(meta_path) + .await + .unwrap() + } +} diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs new file mode 100644 index 000000000..84b9d8e90 --- /dev/null +++ b/server/src/handlers/http/modal/server.rs @@ -0,0 +1,467 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::analytics; +use crate::banner; +use crate::handlers; +use crate::handlers::http::about; +use crate::handlers::http::base_path; +use crate::handlers::http::health_check; +use crate::handlers::http::query; +use crate::handlers::http::API_BASE_PATH; +use crate::handlers::http::API_VERSION; +use crate::localcache::LocalCacheManager; +use crate::metadata; +use crate::metrics; +use crate::migration; +use crate::rbac; +use crate::storage; +use crate::sync; +use std::{fs::File, io::BufReader, sync::Arc}; + +use actix_web::web::resource; +use actix_web::Resource; +use actix_web::Scope; +use actix_web::{web, App, HttpServer}; +use actix_web_prometheus::PrometheusMetrics; +use actix_web_static_files::ResourceFiles; +use async_trait::async_trait; + +use rustls::{Certificate, PrivateKey, ServerConfig}; +use rustls_pemfile::{certs, pkcs8_private_keys}; + +use crate::{ + handlers::http::{ + self, cross_origin_config, ingest, llm, logstream, + middleware::{DisAllowRootUser, RouteExt}, + oidc, role, MAX_EVENT_PAYLOAD_SIZE, + }, + option::CONFIG, + rbac::role::Action, +}; + +// use super::generate; +use super::generate; +use super::OpenIdClient; +use super::ParseableServer; + +#[derive(Default)] +pub struct Server; + +#[async_trait(?Send)] +impl ParseableServer for Server { + async fn start( + &self, + prometheus: PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + None => None, + }; + + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + let ssl_acceptor = match ( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + ) { + (Some(cert), Some(key)) => { + // init server config builder with safe defaults + let config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + + // load TLS key/cert files + let cert_file = &mut BufReader::new(File::open(cert)?); + let key_file = &mut BufReader::new(File::open(key)?); + + // convert files to key/cert objects + let cert_chain = certs(cert_file)?.into_iter().map(Certificate).collect(); + + let mut keys: Vec = pkcs8_private_keys(key_file)? + .into_iter() + .map(PrivateKey) + .collect(); + + // exit if no keys could be parsed + if keys.is_empty() { + anyhow::bail!("Could not locate PKCS 8 private keys."); + } + + let server_config = config.with_single_cert(cert_chain, keys.remove(0))?; + + Some(server_config) + } + (_, _) => None, + }; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + if let Some(config) = ssl_acceptor { + http_server + .bind_rustls(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + self.initialize().await + } + + fn validate(&self) -> anyhow::Result<()> { + Ok(()) + } +} + +impl Server { + fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { + // there might be a bug in the configure routes method + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Self::get_query_factory()) + .service(Self::get_ingest_factory()) + .service(Self::get_liveness_factory()) + .service(Self::get_readiness_factory()) + .service(Self::get_about_factory()) + .service(Self::get_logstream_webscope()) + .service(Self::get_user_webscope()) + .service(Self::get_llm_webscope()) + .service(Self::get_oauth_webscope(oidc_client)) + .service(Self::get_user_role_webscope()), + ) + .service(Self::get_generated()); + } + + // get the query factory + pub fn get_query_factory() -> Resource { + web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) + } + + // get the logstream web scope + pub fn get_logstream_webscope() -> Scope { + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), + ) + .service( + web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) + // POST "/logstream/{logstream}" ==> Post logs to given log stream + .route( + web::post() + .to(ingest::post_event) + .authorize_for_stream(Action::Ingest), + ) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + web::resource("/alert") + // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream + .route( + web::put() + .to(logstream::put_alert) + .authorize_for_stream(Action::PutAlert), + ) + // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream + .route( + web::get() + .to(logstream::get_alert) + .authorize_for_stream(Action::GetAlert), + ), + ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ) + .service( + web::resource("/retention") + // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_retention) + .authorize_for_stream(Action::PutRetention), + ) + // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_retention) + .authorize_for_stream(Action::GetRetention), + ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), + ), + ), + ) + } + + // get the factory for the ingest route + pub fn get_ingest_factory() -> Resource { + web::resource("/ingest") + .route( + web::post() + .to(ingest::ingest) + .authorize_for_stream(Action::Ingest), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + } + + // get the oauth webscope + pub fn get_oauth_webscope(oidc_client: Option) -> Scope { + let oauth = web::scope("/o") + .service(resource("/login").route(web::get().to(oidc::login))) + .service(resource("/logout").route(web::get().to(oidc::logout))) + .service(resource("/code").route(web::get().to(oidc::reply_login))); + + if let Some(client) = oidc_client { + oauth.app_data(web::Data::from(client)) + } else { + oauth + } + } + + // get the role webscope + pub fn get_user_role_webscope() -> Scope { + web::scope("/role") + // GET Role List + .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service( + // PUT and GET Default Role + resource("/default") + .route(web::put().to(role::put_default).authorize(Action::PutRole)) + .route(web::get().to(role::get_default).authorize(Action::GetRole)), + ) + .service( + // PUT, GET, DELETE Roles + resource("/{name}") + .route(web::put().to(role::put).authorize(Action::PutRole)) + .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route(web::get().to(role::get).authorize(Action::GetRole)), + ) + } + + // get the user webscope + pub fn get_user_webscope() -> Scope { + web::scope("/user") + .service( + web::resource("") + // GET /user => List all users + .route( + web::get() + .to(http::rbac::list_users) + .authorize(Action::ListUser), + ), + ) + .service( + web::resource("/{username}") + // PUT /user/{username} => Create a new user + .route( + web::post() + .to(http::rbac::post_user) + .authorize(Action::PutUser), + ) + // DELETE /user/{username} => Delete a user + .route( + web::delete() + .to(http::rbac::delete_user) + .authorize(Action::DeleteUser), + ) + .wrap(DisAllowRootUser), + ) + .service( + web::resource("/{username}/role") + // PUT /user/{username}/roles => Put roles for user + .route( + web::put() + .to(http::rbac::put_role) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ) + .route( + web::get() + .to(http::rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/generate-new-password") + // POST /user/{username}/generate-new-password => reset password for this user + .route( + web::post() + .to(http::rbac::post_gen_password) + .authorize(Action::PutUser) + .wrap(DisAllowRootUser), + ), + ) + } + + // get the llm webscope + pub fn get_llm_webscope() -> Scope { + web::scope("/llm").service( + web::resource("").route( + web::post() + .to(llm::make_llm_request) + .authorize(Action::QueryLLM), + ), + ) + } + + // get the live check + // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command + pub fn get_liveness_factory() -> Resource { + web::resource("/liveness").route(web::get().to(health_check::liveness)) + } + + // get the readiness check + // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes + pub fn get_readiness_factory() -> Resource { + web::resource("/readiness").route(web::get().to(health_check::readiness)) + } + + // get the about factory + pub fn get_about_factory() -> Resource { + web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout)) + } + + // GET "/" ==> Serve the static frontend directory + pub fn get_generated() -> ResourceFiles { + ResourceFiles::new("/", generate()).resolve_not_found_to_root() + } + + async fn initialize(&self) -> anyhow::Result<()> { + migration::run_metadata_migration(&CONFIG).await?; + let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; + rbac::map::init(&metadata); + metadata.set_global(); + + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = metadata::STREAM_INFO.load(&*storage).await { + log::warn!("could not populate local metadata. {:?}", err); + } + + storage::retention::load_retention_from_global(); + metrics::fetch_stats_from_storage().await; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync(); + + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler(); + } + + tokio::spawn(handlers::livetail::server()); + + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + localsync_handler.join().unwrap_or(()); + remote_sync_handler.join().unwrap_or(()); + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + remote_sync_handler.join().unwrap_or(()); + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + } + }; + } + } +} diff --git a/server/src/handlers/http/modal/ssl_acceptor.rs b/server/src/handlers/http/modal/ssl_acceptor.rs new file mode 100644 index 000000000..6b51113b1 --- /dev/null +++ b/server/src/handlers/http/modal/ssl_acceptor.rs @@ -0,0 +1,54 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{fs::File, io::BufReader, path::PathBuf}; + +use itertools::Itertools; +use rustls::{Certificate, PrivateKey, ServerConfig}; +use rustls_pemfile::{certs, pkcs8_private_keys}; + +pub fn get_ssl_acceptor( + tls_cert: &Option, + tls_key: &Option, +) -> anyhow::Result> { + match (tls_cert, tls_key) { + (Some(cert), Some(key)) => { + let server_config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + + let cert_file = &mut BufReader::new(File::open(cert)?); + let key_file = &mut BufReader::new(File::open(key)?); + let cert_chain = certs(cert_file)?.into_iter().map(Certificate).collect_vec(); + + let mut keys = pkcs8_private_keys(key_file)? + .into_iter() + .map(PrivateKey) + .collect_vec(); + + if keys.is_empty() { + anyhow::bail!("Could not locate PKCS 8 private keys."); + } + + Ok(Some( + server_config.with_single_cert(cert_chain, keys.remove(0))?, + )) + } + (_, _) => Ok(None), + } +} diff --git a/server/src/main.rs b/server/src/main.rs index ef0cb2cc6..60bca9fcf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,20 +16,12 @@ * */ -use clokwerk::{AsyncScheduler, Job, Scheduler, TimeUnits}; -use thread_priority::{ThreadBuilder, ThreadPriority}; -use tokio::sync::oneshot; -use tokio::sync::oneshot::error::TryRecvError; - -use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; - mod about; mod alerts; mod analytics; mod banner; mod catalog; +mod cli; mod event; mod handlers; mod livetail; @@ -44,161 +36,43 @@ mod rbac; mod response; mod stats; mod storage; +mod sync; mod utils; mod validator; -use option::CONFIG; +use std::sync::Arc; + +use handlers::http::modal::ParseableServer; +use option::{Mode, CONFIG}; -use crate::localcache::LocalCacheManager; +use crate::{ + handlers::http::modal::{ + ingest_server::IngestServer, query_server::QueryServer, server::Server, + }, + // localcache::LocalCacheManager, +}; pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; #[actix_web::main] async fn main() -> anyhow::Result<()> { env_logger::init(); - let storage = CONFIG.storage().get_object_store(); - CONFIG.validate().await?; - migration::run_metadata_migration(&CONFIG).await?; - let metadata = storage::resolve_parseable_metadata().await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { - log::warn!("could not populate local metadata. {:?}", e); - } - - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); - // load data from stats back to prometheus metrics - metrics::load_from_stats_from_storage().await; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - object_store_sync(); - - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler(); - } + CONFIG.validate_storage().await?; - tokio::spawn(handlers::livetail::server()); + // these are empty ptrs so mem footprint should be minimal + let server: Arc = match CONFIG.parseable.mode { + Mode::Query => Arc::new(QueryServer), - let app = handlers::http::run_http(prometheus, CONFIG.parseable.openid.clone()); - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); - } + Mode::Ingest => Arc::new(IngestServer), - }; - } -} - -fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let handle = thread::spawn(move || { - let res = catch_unwind(move || { - let rt = actix_web::rt::System::new(); - rt.block_on(async { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(STORAGE_UPLOAD_INTERVAL.seconds()) - // Extra time interval is added so that this schedular does not race with local sync. - .plus(5u32.seconds()) - .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().sync().await { - log::warn!("failed to sync local data with object store. {:?}", e); - } - }); - - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - scheduler.run_pending().await; - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; - } - } - } - }) - }); - - if res.is_err() { - outbox_tx.send(()).unwrap(); - } - }); - - (handle, outbox_rx, inbox_tx) -} - -fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - - let handle = ThreadBuilder::default() - .name("local-sync") - .priority(ThreadPriority::Max) - .spawn(move |priority_result| { - if priority_result.is_err() { - log::warn!("Max priority cannot be set for sync thread. Make sure that user/program is allowed to set thread priority.") - } - let res = catch_unwind(move || { - let mut scheduler = Scheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(move || crate::event::STREAM_WRITERS.unset_all()); + Mode::All => Arc::new(Server), + }; - loop { - thread::sleep(Duration::from_millis(50)); - scheduler.run_pending(); - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; - } - } - } - }); + // add logic for graceful shutdown if + // MODE == Query / Ingest and storage = local-store + // option.rs ln: 161 + // CONFIG.run_time_mode_validation()?; - if res.is_err() { - outbox_tx.send(()).unwrap(); - } - }) - .unwrap(); + server.init().await?; - (handle, outbox_rx, inbox_tx) + Ok(()) } diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 05e6baf86..513bf2540 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -133,7 +133,7 @@ fn prom_process_metrics(metrics: &PrometheusMetrics) { #[cfg(not(target_os = "linux"))] fn prom_process_metrics(_metrics: &PrometheusMetrics) {} -pub async fn load_from_stats_from_storage() { +pub async fn fetch_stats_from_storage() { for stream_name in STREAM_INFO.list_streams() { let stats = CONFIG .storage() diff --git a/server/src/migration.rs b/server/src/migration.rs index 5484e84c3..eb3b98c0e 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -29,7 +29,10 @@ use serde::Serialize; use crate::{ option::Config, - storage::{ObjectStorage, ObjectStorageError}, + storage::{ + ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, + }, }; /// Migrate the metdata from v1 or v2 to v3 @@ -46,6 +49,7 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { .and_then(|version| version.as_str()) } + // if storage metadata is none do nothing if let Some(storage_metadata) = storage_metadata { match get_version(&storage_metadata) { Some("v1") => { @@ -60,6 +64,7 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { } } + // if staging metadata is none do nothing if let Some(staging_metadata) = staging_metadata { match get_version(&staging_metadata) { Some("v1") => { @@ -89,7 +94,7 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { } async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { - let path = RelativePathBuf::from_iter([stream, ".stream.json"]); + let path = RelativePathBuf::from_iter([stream, STREAM_METADATA_FILE_NAME]); let stream_metadata = storage.get_object(&path).await?; let stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); @@ -106,7 +111,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: .put_object(&path, to_bytes(&new_stream_metadata)) .await?; - let schema_path = RelativePathBuf::from_iter([stream, ".schema"]); + let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema).ok(); let map = schema_migration::v1_v3(schema)?; @@ -118,7 +123,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: .put_object(&path, to_bytes(&new_stream_metadata)) .await?; - let schema_path = RelativePathBuf::from_iter([stream, ".schema"]); + let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema)?; let map = schema_migration::v2_v3(schema)?; @@ -138,7 +143,7 @@ fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { } pub fn get_staging_metadata(config: &Config) -> anyhow::Result> { - let path = config.staging_dir().join(".parseable.json"); + let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let bytes = match std::fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { @@ -153,7 +158,7 @@ pub fn get_staging_metadata(config: &Config) -> anyhow::Result anyhow::Result> { - let path = RelativePathBuf::from_iter([".parseable.json"]); + let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); match storage.get_object(&path).await { Ok(bytes) => Ok(Some( serde_json::from_slice(&bytes).expect("parseable config is valid json"), @@ -172,13 +177,13 @@ pub async fn put_remote_metadata( storage: &dyn ObjectStorage, metadata: &serde_json::Value, ) -> anyhow::Result<()> { - let path = RelativePathBuf::from_iter([".parseable.json"]); + let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); let metadata = serde_json::to_vec(metadata)?.into(); Ok(storage.put_object(&path, metadata).await?) } pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> { - let path = config.staging_dir().join(".parseable.json"); + let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() .create(true) .truncate(true) diff --git a/server/src/option.rs b/server/src/option.rs index 5d713f28b..99bff7e66 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -17,18 +17,17 @@ */ use clap::error::ErrorKind; -use clap::{command, value_parser, Arg, ArgGroup, Args, Command, FromArgMatches}; +use clap::{command, Args, Command, FromArgMatches}; use once_cell::sync::Lazy; use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use std::env; use std::path::PathBuf; use std::sync::Arc; -use url::Url; -use crate::oidc::{self, OpenidConfig}; +use crate::cli::Cli; +use crate::storage::PARSEABLE_METADATA_FILE_NAME; use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; - pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB pub const JOIN_COMMUNITY: &str = "Join us on Parseable Slack community for questions : https://logg.ing/community"; @@ -36,18 +35,18 @@ pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] pub struct Config { - pub parseable: Server, + pub parseable: Cli, storage: Arc, pub storage_name: &'static str, } impl Config { fn new() -> Self { - let cli = parseable_cli_command().get_matches(); + let cli = create_parseable_cli_command().get_matches(); match cli.subcommand() { Some(("local-store", m)) => { - let server = match Server::from_arg_matches(m) { - Ok(server) => server, + let cli = match Cli::from_arg_matches(m) { + Ok(cli) => cli, Err(err) => err.exit(), }; let storage = match FSConfig::from_arg_matches(m) { @@ -55,8 +54,8 @@ impl Config { Err(err) => err.exit(), }; - if server.local_staging_path == storage.root { - parseable_cli_command() + if cli.local_staging_path == storage.root { + create_parseable_cli_command() .error( ErrorKind::ValueValidation, "Cannot use same path for storage and staging", @@ -64,8 +63,8 @@ impl Config { .exit() } - if server.local_cache_path.is_some() { - parseable_cli_command() + if cli.local_cache_path.is_some() { + create_parseable_cli_command() .error( ErrorKind::ValueValidation, "Cannot use cache with local-store subcommand.", @@ -74,14 +73,14 @@ impl Config { } Config { - parseable: server, + parseable: cli, storage: Arc::new(storage), storage_name: "drive", } } Some(("s3-store", m)) => { - let server = match Server::from_arg_matches(m) { - Ok(server) => server, + let cli = match Cli::from_arg_matches(m) { + Ok(cli) => cli, Err(err) => err.exit(), }; let storage = match S3Config::from_arg_matches(m) { @@ -90,7 +89,7 @@ impl Config { }; Config { - parseable: server, + parseable: cli, storage: Arc::new(storage), storage_name: "s3", } @@ -99,9 +98,11 @@ impl Config { } } - pub async fn validate(&self) -> Result<(), ObjectStorageError> { + // validate the storage, if the proper path for staging directory is provided + // if the proper data directory is provided, or s3 bucket is provided etc + pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> { let obj_store = self.storage.get_object_store(); - let rel_path = relative_path::RelativePathBuf::from(".parseable.json"); + let rel_path = relative_path::RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok(); @@ -118,7 +119,7 @@ impl Config { return Ok(()); } - if self.mode_string() == "Local drive" { + if self.get_storage_mode_string() == "Local drive" { return Err(ObjectStorageError::Custom(format!("Could not start the server because directory '{}' contains stale data, please use an empty directory, and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))); } @@ -143,34 +144,45 @@ impl Config { } pub fn is_default_creds(&self) -> bool { - self.parseable.username == Server::DEFAULT_USERNAME - && self.parseable.password == Server::DEFAULT_PASSWORD + self.parseable.username == Cli::DEFAULT_USERNAME + && self.parseable.password == Cli::DEFAULT_PASSWORD } // returns the string representation of the storage mode // drive --> Local drive // s3 --> S3 bucket - pub fn mode_string(&self) -> &str { - let mut mode = "S3 bucket"; + pub fn get_storage_mode_string(&self) -> &str { if self.storage_name == "drive" { - mode = "Local drive"; + return "Local drive"; } - mode + "S3 bucket" + } + + #[allow(dead_code)] + pub fn run_time_mode_validation(&self) -> anyhow::Result<()> { + let check = (self.parseable.mode == Mode::Ingest || self.parseable.mode == Mode::Query) + && self.storage_name == "drive"; + + if check { + anyhow::bail!(format!("Cannot start the server in {} mode with local storage, please use S3 bucket for storage", self.parseable.mode.to_str())) + } + + Ok(()) } } -fn parseable_cli_command() -> Command { - let local = Server::get_clap_command("local-store"); +fn create_parseable_cli_command() -> Command { + let local = Cli::create_cli_command_with_clap("local-store"); let local = ::augment_args_for_update(local); let local = local - .mut_arg(Server::USERNAME, |arg| { - arg.required(false).default_value(Server::DEFAULT_USERNAME) + .mut_arg(Cli::USERNAME, |arg| { + arg.required(false).default_value(Cli::DEFAULT_USERNAME) }) - .mut_arg(Server::PASSWORD, |arg| { - arg.required(false).default_value(Server::DEFAULT_PASSWORD) + .mut_arg(Cli::PASSWORD, |arg| { + arg.required(false).default_value(Cli::DEFAULT_PASSWORD) }); - let s3 = Server::get_clap_command("s3-store"); + let s3 = Cli::create_cli_command_with_clap("s3-store"); let s3 = ::augment_args_for_update(s3); command!() @@ -190,428 +202,6 @@ fn parseable_cli_command() -> Command { .subcommands([local, s3]) } -#[derive(Debug, Default)] -pub struct Server { - /// The location of TLS Cert file - pub tls_cert_path: Option, - - /// The location of TLS Private Key file - pub tls_key_path: Option, - - /// The address on which the http server will listen. - pub address: String, - - /// Base domain under which server is hosted. - /// This information is used by OIDC to refer redirects - pub domain_address: Option, - - /// The local staging path is used as a temporary landing point - /// for incoming events and local cache - pub local_staging_path: PathBuf, - - /// The local cache path is used for speeding up query on latest data - pub local_cache_path: Option, - - /// Size for local cache - pub local_cache_size: u64, - - /// Username for the basic authentication on the server - pub username: String, - - /// Password for the basic authentication on the server - pub password: String, - - /// OpenId configuration - pub openid: Option, - - /// Server should check for update or not - pub check_update: bool, - - /// Server should send anonymous analytics or not - pub send_analytics: bool, - - /// Open AI access key - pub open_ai_key: Option, - - /// Livetail port - pub grpc_port: u16, - - /// Livetail channel capacity - pub livetail_channel_capacity: usize, - - /// Rows in Parquet Rowgroup - pub row_group_size: usize, - - /// Query memory limit in bytes - pub query_memory_pool_size: Option, - - /// Parquet compression algorithm - pub parquet_compression: Compression, - - /// Mode of operation - pub mode: Mode, -} - -impl FromArgMatches for Server { - fn from_arg_matches(m: &clap::ArgMatches) -> Result { - let mut s: Self = Self::default(); - s.update_from_arg_matches(m)?; - Ok(s) - } - - fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { - self.local_cache_path = m.get_one::(Self::CACHE).cloned(); - self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); - self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); - self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); - let openid_client_id = m.get_one::(Self::OPENID_CLIENT_ID).cloned(); - let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); - let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); - - self.address = m - .get_one::(Self::ADDRESS) - .cloned() - .expect("default value for address"); - self.local_staging_path = m - .get_one::(Self::STAGING) - .cloned() - .expect("default value for staging"); - self.local_cache_size = m - .get_one::(Self::CACHE_SIZE) - .cloned() - .expect("default value for cache size"); - self.username = m - .get_one::(Self::USERNAME) - .cloned() - .expect("default for username"); - self.password = m - .get_one::(Self::PASSWORD) - .cloned() - .expect("default for password"); - self.check_update = m - .get_one::(Self::CHECK_UPDATE) - .cloned() - .expect("default for check update"); - self.send_analytics = m - .get_one::(Self::SEND_ANALYTICS) - .cloned() - .expect("default for send analytics"); - self.open_ai_key = m.get_one::(Self::OPEN_AI_KEY).cloned(); - self.grpc_port = m - .get_one::(Self::GRPC_PORT) - .cloned() - .expect("default for livetail port"); - self.livetail_channel_capacity = m - .get_one::(Self::LIVETAIL_CAPACITY) - .cloned() - .expect("default for livetail capacity"); - // converts Gib to bytes before assigning - self.query_memory_pool_size = m - .get_one::(Self::QUERY_MEM_POOL_SIZE) - .cloned() - .map(|gib| gib as usize * 1024usize.pow(3)); - self.row_group_size = m - .get_one::(Self::ROW_GROUP_SIZE) - .cloned() - .expect("default for row_group size"); - self.parquet_compression = match m - .get_one::(Self::PARQUET_COMPRESSION_ALGO) - .expect("default for compression algo") - .as_str() - { - "uncompressed" => Compression::UNCOMPRESSED, - "snappy" => Compression::SNAPPY, - "gzip" => Compression::GZIP, - "lzo" => Compression::LZO, - "brotli" => Compression::BROTLI, - "lz4" => Compression::LZ4, - "zstd" => Compression::ZSTD, - _ => unreachable!(), - }; - - self.openid = match (openid_client_id, openid_client_secret, openid_issuer) { - (Some(id), Some(secret), Some(issuer)) => { - let origin = if let Some(url) = self.domain_address.clone() { - oidc::Origin::Production(url) - } else { - oidc::Origin::Local { - socket_addr: self.address.clone(), - https: self.tls_cert_path.is_some() && self.tls_key_path.is_some(), - } - }; - Some(OpenidConfig { - id, - secret, - issuer, - origin, - }) - } - _ => None, - }; - - self.mode = match m - .get_one::(Self::MODE) - .expect("Mode not set") - .as_str() - { - "query" => Mode::Query, - "ingest" => Mode::Ingest, - "all" => Mode::All, - _ => unreachable!(), - }; - - Ok(()) - } -} - -impl Server { - // identifiers for arguments - pub const TLS_CERT: &'static str = "tls-cert-path"; - pub const TLS_KEY: &'static str = "tls-key-path"; - pub const ADDRESS: &'static str = "address"; - pub const DOMAIN_URI: &'static str = "origin"; - pub const STAGING: &'static str = "local-staging-path"; - pub const CACHE: &'static str = "cache-path"; - pub const CACHE_SIZE: &'static str = "cache-size"; - pub const USERNAME: &'static str = "username"; - pub const PASSWORD: &'static str = "password"; - pub const CHECK_UPDATE: &'static str = "check-update"; - pub const SEND_ANALYTICS: &'static str = "send-analytics"; - pub const OPEN_AI_KEY: &'static str = "open-ai-key"; - pub const OPENID_CLIENT_ID: &'static str = "oidc-client"; - pub const OPENID_CLIENT_SECRET: &'static str = "oidc-client-secret"; - pub const OPENID_ISSUER: &'static str = "oidc-issuer"; - pub const GRPC_PORT: &'static str = "grpc-port"; - pub const LIVETAIL_CAPACITY: &'static str = "livetail-capacity"; - // todo : what should this flag be - pub const QUERY_MEM_POOL_SIZE: &'static str = "query-mempool-size"; - pub const ROW_GROUP_SIZE: &'static str = "row-group-size"; - pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; - pub const MODE: &'static str = "mode"; - pub const DEFAULT_USERNAME: &'static str = "admin"; - pub const DEFAULT_PASSWORD: &'static str = "admin"; - - pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { - self.local_staging_path.join(stream_name) - } - - pub fn get_scheme(&self) -> String { - if self.tls_cert_path.is_some() && self.tls_key_path.is_some() { - return "https".to_string(); - } - "http".to_string() - } - - pub fn get_clap_command(name: &'static str) -> Command { - Command::new(name).next_line_help(false) - .arg( - Arg::new(Self::TLS_CERT) - .long(Self::TLS_CERT) - .env("P_TLS_CERT_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where certificate file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::TLS_KEY) - .long(Self::TLS_KEY) - .env("P_TLS_KEY_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where private key file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::ADDRESS) - .long(Self::ADDRESS) - .env("P_ADDR") - .value_name("ADDR:PORT") - .default_value("0.0.0.0:8000") - .value_parser(validation::socket_addr) - .help("Address and port for Parseable HTTP(s) server"), - ) - .arg( - Arg::new(Self::STAGING) - .long(Self::STAGING) - .env("P_STAGING_DIR") - .value_name("DIR") - .default_value("./staging") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used as landing point for incoming events") - .next_line_help(true), - ) - .arg( - Arg::new(Self::CACHE) - .long(Self::CACHE) - .env("P_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::CACHE_SIZE) - .long(Self::CACHE_SIZE) - .env("P_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - .arg( - Arg::new(Self::USERNAME) - .long(Self::USERNAME) - .env("P_USERNAME") - .value_name("STRING") - .required(true) - .help("Admin username to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::PASSWORD) - .long(Self::PASSWORD) - .env("P_PASSWORD") - .value_name("STRING") - .required(true) - .help("Admin password to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::CHECK_UPDATE) - .long(Self::CHECK_UPDATE) - .env("P_CHECK_UPDATE") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable checking for new Parseable release"), - ) - .arg( - Arg::new(Self::SEND_ANALYTICS) - .long(Self::SEND_ANALYTICS) - .env("P_SEND_ANONYMOUS_USAGE_DATA") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable anonymous telemetry data collection"), - ) - .arg( - Arg::new(Self::OPEN_AI_KEY) - .long(Self::OPEN_AI_KEY) - .env("P_OPENAI_API_KEY") - .value_name("STRING") - .required(false) - .help("OpenAI key to enable llm features"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_ID) - .long(Self::OPENID_CLIENT_ID) - .env("P_OIDC_CLIENT_ID") - .value_name("STRING") - .required(false) - .help("Client id for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_SECRET) - .long(Self::OPENID_CLIENT_SECRET) - .env("P_OIDC_CLIENT_SECRET") - .value_name("STRING") - .required(false) - .help("Client secret for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_ISSUER) - .long(Self::OPENID_ISSUER) - .env("P_OIDC_ISSUER") - .value_name("URl") - .required(false) - .value_parser(validation::url) - .help("OIDC provider's host address"), - ) - .arg( - Arg::new(Self::DOMAIN_URI) - .long(Self::DOMAIN_URI) - .env("P_ORIGIN_URI") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("Parseable server global domain address"), - ) - .arg( - Arg::new(Self::GRPC_PORT) - .long(Self::GRPC_PORT) - .env("P_GRPC_PORT") - .value_name("PORT") - .default_value("8001") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for gRPC server"), - ) - .arg( - Arg::new(Self::LIVETAIL_CAPACITY) - .long(Self::LIVETAIL_CAPACITY) - .env("P_LIVETAIL_CAPACITY") - .value_name("NUMBER") - .default_value("1000") - .required(false) - .value_parser(value_parser!(usize)) - .help("Number of rows in livetail channel"), - ) - .arg( - Arg::new(Self::QUERY_MEM_POOL_SIZE) - .long(Self::QUERY_MEM_POOL_SIZE) - .env("P_QUERY_MEMORY_LIMIT") - .value_name("Gib") - .required(false) - .value_parser(value_parser!(u8)) - .help("Set a fixed memory limit for query"), - ) - .arg( - Arg::new(Self::ROW_GROUP_SIZE) - .long(Self::ROW_GROUP_SIZE) - .env("P_PARQUET_ROW_GROUP_SIZE") - .value_name("NUMBER") - .required(false) - .default_value("16384") - .value_parser(value_parser!(usize)) - .help("Number of rows in a row group"), - ).arg( - Arg::new(Self::MODE) - .long(Self::MODE) - .env("P_MODE") - .value_name("STRING") - .required(false) - .default_value("all") - .value_parser([ - "query", - "ingest", - "all"]) - .help("Mode of operation"), - ) - .arg( - Arg::new(Self::PARQUET_COMPRESSION_ALGO) - .long(Self::PARQUET_COMPRESSION_ALGO) - .env("P_PARQUET_COMPRESSION_ALGO") - .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") - .required(false) - .default_value("lz4") - .value_parser([ - "uncompressed", - "snappy", - "gzip", - "lzo", - "brotli", - "lz4", - "zstd"]) - .help("Parquet compression algorithm"), - ).group( - ArgGroup::new("oidc") - .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .multiple(true) - ) - } -} - #[derive(Debug, Default, Eq, PartialEq)] pub enum Mode { Query, @@ -620,6 +210,16 @@ pub enum Mode { All, } +impl Mode { + pub fn to_str(&self) -> &str { + match self { + Mode::Query => "Query Server", + Mode::Ingest => "Ingest Server", + Mode::All => "All", + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] #[allow(non_camel_case_types, clippy::upper_case_acronyms)] pub enum Compression { diff --git a/server/src/storage.rs b/server/src/storage.rs index 975fcf445..347f12a74 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -39,6 +39,13 @@ pub use store_metadata::{ pub use self::staging::StorageDir; +// metadata file names in a Stream prefix +pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; +pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; +pub const SCHEMA_FILE_NAME: &str = ".schema"; +pub const ALERT_FILE_NAME: &str = ".alert.json"; +pub const MANIFEST_FILE: &str = "manifest.json"; + /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. pub const LOCAL_SYNC_INTERVAL: u64 = 60; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index e0880cff0..bce837d58 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -34,7 +34,9 @@ use tokio_stream::wrappers::ReadDirStream; use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::option::validation; -use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; +use super::{ + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, STREAM_METADATA_FILE_NAME, +}; #[derive(Debug, Clone, clap::Args)] #[command( @@ -74,6 +76,7 @@ impl ObjectStorageProvider for FSConfig { } pub struct LocalFS { + // absolute path of the data directory root: PathBuf, } @@ -110,6 +113,47 @@ impl ObjectStorage for LocalFS { res } + async fn get_objects( + &self, + base_path: Option<&RelativePath>, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + + let prefix = if let Some(path) = base_path { + path.to_path(&self.root) + } else { + self.root.clone() + }; + + let mut entries = fs::read_dir(&prefix).await?; + let mut res = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + let ingestor_file = entry + .path() + .file_name() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .contains("ingestor"); + + if !ingestor_file { + continue; + } + + let file = fs::read(entry.path()).await?; + res.push(file.into()); + } + + // maybe change the return code + let status = if res.is_empty() { "200" } else { "400" }; + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", status]) + .observe(time); + + Ok(res) + } + async fn put_object( &self, path: &RelativePath, @@ -228,6 +272,16 @@ impl ObjectStorage for LocalFS { fn store_url(&self) -> url::Url { url::Url::parse("file:///").unwrap() } + + fn get_bucket_name(&self) -> String { + self.root + .iter() + .last() + .unwrap() + .to_str() + .unwrap() + .to_string() + } } async fn dir_with_stream( @@ -248,7 +302,7 @@ async fn dir_with_stream( if entry.file_type().await?.is_dir() { let path = entry.path(); - let stream_json_path = path.join(object_storage::STREAM_METADATA_FILE_NAME); + let stream_json_path = path.join(STREAM_METADATA_FILE_NAME); if stream_json_path.exists() { Ok(Some(dir_name)) } else { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..53c6779e7 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -20,7 +20,12 @@ use super::{ retention::Retention, staging::convert_disk_files_to_parquet, LogStream, ObjectStorageError, ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; +use super::{ + ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, +}; +use crate::utils::get_address; use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, @@ -49,13 +54,6 @@ use std::{ time::{Duration, Instant}, }; -// metadata file names in a Stream prefix -pub(super) const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; -pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; -const SCHEMA_FILE_NAME: &str = ".schema"; -const ALERT_FILE_NAME: &str = ".alert.json"; -const MANIFEST_FILE: &str = "manifest.json"; - pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { fn get_datafusion_runtime(&self) -> RuntimeConfig; fn get_object_store(&self) -> Arc; @@ -66,6 +64,11 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { #[async_trait] pub trait ObjectStorage: Sync + 'static { async fn get_object(&self, path: &RelativePath) -> Result; + // want to make it more generic with a filter function + async fn get_objects( + &self, + base_path: Option<&RelativePath>, + ) -> Result, ObjectStorageError>; async fn put_object( &self, path: &RelativePath, @@ -84,7 +87,7 @@ pub trait ObjectStorage: Sync + 'static { async fn get_latency(&self) -> Duration { // It's Ok to `unwrap` here. The hardcoded value will always Result in // an `Ok`. - let path = RelativePathBuf::from_path(".parseable.json").unwrap(); + let path = RelativePathBuf::from_path(PARSEABLE_METADATA_FILE_NAME).unwrap(); let start = Instant::now(); let _ = self.get_object(&path).await; @@ -264,6 +267,7 @@ pub trait ObjectStorage: Sync + 'static { } } + // get the manifest info async fn get_manifest( &self, path: &RelativePath, @@ -292,6 +296,7 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&manifest)).await } + // gets the snapshot of the stream async fn get_snapshot(&self, stream: &str) -> Result { let path = stream_json_path(stream); let bytes = self.get_object(&path).await?; @@ -402,6 +407,9 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } + + // pick a better name + fn get_bucket_name(&self) -> String; } async fn commit_schema_to_storage( @@ -443,5 +451,7 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] fn manifest_path(prefix: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([prefix, MANIFEST_FILE]) + let addr = get_address(); + let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE); + RelativePathBuf::from_iter([prefix, &mainfest_file_name]) } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index ef1144f18..edf342f9b 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -42,7 +42,7 @@ use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; use super::metrics_layer::MetricLayer; -use super::{object_storage, ObjectStorageProvider}; +use super::{ObjectStorageProvider, PARSEABLE_METADATA_FILE_NAME, STREAM_METADATA_FILE_NAME}; // in bytes const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; @@ -197,6 +197,7 @@ impl ObjectStorageProvider for S3Config { Arc::new(S3 { client: s3, bucket: self.bucket_name.clone(), + root: StorePath::from(""), }) } @@ -209,20 +210,21 @@ impl ObjectStorageProvider for S3Config { } } -fn to_path(path: &RelativePath) -> StorePath { +fn to_object_store_path(path: &RelativePath) -> StorePath { StorePath::from(path.as_str()) } pub struct S3 { client: LimitStore, bucket: String, + root: StorePath, } impl S3 { async fn _get_object(&self, path: &RelativePath) -> Result { let instant = Instant::now(); - let resp = self.client.get(&to_path(path)).await; + let resp = self.client.get(&to_object_store_path(path)).await; match resp { Ok(resp) => { @@ -249,7 +251,7 @@ impl S3 { resource: Bytes, ) -> Result<(), ObjectStorageError> { let time = Instant::now(); - let resp = self.client.put(&to_path(path), resource).await; + let resp = self.client.put(&to_object_store_path(path), resource).await; let status = if resp.is_ok() { "200" } else { "400" }; let time = time.elapsed().as_secs_f64(); REQUEST_RESPONSE_TIME @@ -304,7 +306,7 @@ impl S3 { let stream_json_check = FuturesUnordered::new(); for dir in &dirs { - let key = format!("{}/{}", dir, object_storage::STREAM_METADATA_FILE_NAME); + let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } @@ -403,6 +405,53 @@ impl ObjectStorage for S3 { Ok(self._get_object(path).await?) } + // TBD is this the right way or the api calls are too many? + async fn get_objects( + &self, + base_path: Option<&RelativePath>, + ) -> Result, ObjectStorageError> { + let instant = Instant::now(); + + let prefix = if let Some(base_path) = base_path { + to_object_store_path(base_path) + } else { + self.root.clone() + }; + + let mut list_stream = self.client.list(Some(&prefix)).await?; + + let mut res = vec![]; + + while let Some(meta) = list_stream.next().await.transpose()? { + let ingestor_file = meta + .location + .filename() + .unwrap_or_default() + .contains("ingestor"); + + if !ingestor_file { + continue; + } + + let byts = self + .get_object( + RelativePath::from_path(meta.location.as_ref()).map_err(|err| { + ObjectStorageError::Custom(format!("Error while getting files: {:}", err)) + })?, + ) + .await?; + + res.push(byts); + } + + let instant = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(instant); + + Ok(res) + } + async fn put_object( &self, path: &RelativePath, @@ -424,7 +473,7 @@ impl ObjectStorage for S3 { async fn check(&self) -> Result<(), ObjectStorageError> { Ok(self .client - .head(&object_storage::PARSEABLE_METADATA_FILE_NAME.into()) + .head(&PARSEABLE_METADATA_FILE_NAME.into()) .await .map(|_| ())?) } @@ -482,6 +531,10 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>()) } + + fn get_bucket_name(&self) -> String { + self.bucket.clone() + } } impl From for ObjectStorageError { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 31c5dffed..e3d6d0fbb 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -20,6 +20,7 @@ use std::{ collections::HashMap, fs, + net::SocketAddr, path::{Path, PathBuf}, process, sync::Arc, @@ -159,6 +160,16 @@ impl StorageDir { fn arrow_path_to_parquet(path: &Path) -> PathBuf { let filename = path.file_name().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); + + let port = CONFIG + .parseable + .address + .clone() + .parse::() + .unwrap() + .port(); + let filename = filename.rsplit_once('.').unwrap(); + let filename = format!("{}.{}.{}", filename.0, port, filename.1); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename); parquet_path.set_extension("parquet"); diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index b7d3a52f8..3c051ac62 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -26,13 +26,13 @@ use once_cell::sync::OnceCell; use std::io; use crate::{ - option::{CONFIG, JOIN_COMMUNITY}, + option::{Mode, CONFIG, JOIN_COMMUNITY}, rbac::{role::model::DefaultPrivilege, user::User}, storage::ObjectStorageError, utils::uid, }; -use super::object_storage::PARSEABLE_METADATA_FILE_NAME; +use super::PARSEABLE_METADATA_FILE_NAME; // Expose some static variables for internal usage pub static STORAGE_METADATA: OnceCell = OnceCell::new(); @@ -92,6 +92,7 @@ impl StorageMetadata { } } +/// deals with the staging directory creation and metadata resolution /// always returns remote metadata as it is source of truth /// overwrites staging metadata while updating storage info pub async fn resolve_parseable_metadata() -> Result { @@ -130,15 +131,26 @@ pub async fn resolve_parseable_metadata() -> Result overwrite_remote = true, + _ => { + metadata.staging = CONFIG.staging_dir().to_path_buf(); + }, + } Ok(metadata) } EnvChange::CreateBoth => { create_dir_all(CONFIG.staging_dir())?; let metadata = StorageMetadata::new(); - // new metadata needs to be set on both staging and remote - overwrite_remote = true; + // new metadata needs to be set + // if mode is query or all then both staging and remote + match CONFIG.parseable.mode { + Mode::All | Mode::Query => overwrite_remote = true, + _ => (), + } + // else only staging overwrite_staging = true; Ok(metadata) } diff --git a/server/src/sync.rs b/server/src/sync.rs new file mode 100644 index 000000000..d7eb5d2d7 --- /dev/null +++ b/server/src/sync.rs @@ -0,0 +1,112 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use clokwerk::{AsyncScheduler, Job, Scheduler, TimeUnits}; +use thread_priority::{ThreadBuilder, ThreadPriority}; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::TryRecvError; + +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use crate::option::CONFIG; +use crate::{storage, STORAGE_UPLOAD_INTERVAL}; + +pub(crate) fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let handle = thread::spawn(move || { + let res = catch_unwind(move || { + let rt = actix_web::rt::System::new(); + rt.block_on(async { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(STORAGE_UPLOAD_INTERVAL.seconds()) + // Extra time interval is added so that this schedular does not race with local sync. + .plus(5u32.seconds()) + .run(|| async { + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("failed to sync local data with object store. {:?}", e); + } + }); + + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + scheduler.run_pending().await; + match AssertUnwindSafe(|| inbox_rx.try_recv())() { + Ok(_) => break, + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + // should be unreachable but breaking anyways + break; + } + } + } + }) + }); + + if res.is_err() { + outbox_tx.send(()).unwrap(); + } + }); + + (handle, outbox_rx, inbox_tx) +} + +pub(crate) fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + + let handle = ThreadBuilder::default() + .name("local-sync") + .priority(ThreadPriority::Max) + .spawn(move |priority_result| { + if priority_result.is_err() { + log::warn!("Max priority cannot be set for sync thread. Make sure that user/program is allowed to set thread priority.") + } + let res = catch_unwind(move || { + let mut scheduler = Scheduler::new(); + scheduler + .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) + .run(move || crate::event::STREAM_WRITERS.unset_all()); + + loop { + thread::sleep(Duration::from_millis(50)); + scheduler.run_pending(); + match AssertUnwindSafe(|| inbox_rx.try_recv())() { + Ok(_) => break, + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + // should be unreachable but breaking anyways + break; + } + } + } + }); + + if res.is_err() { + outbox_tx.send(()).unwrap(); + } + }) + .unwrap(); + + (handle, outbox_rx, inbox_tx) +} diff --git a/server/src/utils.rs b/server/src/utils.rs index 83af01cc6..530f2b21d 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -23,8 +23,12 @@ pub mod json; pub mod uid; pub mod update; +use std::net::{IpAddr, SocketAddr}; + use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use crate::option::CONFIG; + #[allow(dead_code)] pub fn hostname() -> Option { hostname::get() @@ -222,6 +226,12 @@ impl TimePeriod { } } +#[inline(always)] +pub fn get_address() -> (IpAddr, u16) { + let addr = CONFIG.parseable.address.parse::().unwrap(); + (addr.ip(), addr.port()) +} + #[cfg(test)] mod tests { use chrono::DateTime; diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 8a31ae200..ef76ddf3f 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -17,12 +17,11 @@ * */ -use std::{fs::File, io::BufReader, path::PathBuf, sync::Arc}; - use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_ipc::reader::StreamReader; use arrow_schema::Schema; use itertools::kmerge_by; +use std::{fs::File, io::BufReader, path::PathBuf, sync::Arc}; use super::{ adapt_batch,