diff --git a/src/about.rs b/src/about.rs index 1897e1299..36b4afffc 100644 --- a/src/about.rs +++ b/src/about.rs @@ -136,7 +136,7 @@ fn print_latest_release(latest_release: update::LatestRelease) { pub async fn print(config: &Config, meta: &StorageMetadata) { // print current version let current = current(); - let latest_release = if config.parseable.check_update { + let latest_release = if config.options.check_update { update::get_latest(&meta.deployment_id).await.ok() } else { None diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 121bc6bb8..ffa7314d0 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -103,8 +103,8 @@ impl Alert { ) -> Context { let deployment_instance = format!( "{}://{}", - CONFIG.parseable.get_scheme(), - CONFIG.parseable.address + CONFIG.options.get_scheme(), + CONFIG.options.address ); let deployment_id = storage::StorageMetadata::global().deployment_id; let deployment_mode = storage::StorageMetadata::global().mode.to_string(); diff --git a/src/analytics.rs b/src/analytics.rs index 85b14b87a..a5e65f99b 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -112,7 +112,7 @@ impl Report { memory_total_bytes: mem_total, platform: platform().to_string(), storage_mode: CONFIG.get_storage_mode_string().to_string(), - server_mode: CONFIG.parseable.mode, + server_mode: CONFIG.options.mode, version: current().released_version.to_string(), commit_hash: current().commit_hash, active_ingestors: ingestor_metrics.0, @@ -219,7 +219,7 @@ async fn fetch_ingestors_metrics( let mut vec = vec![]; let mut active_ingestors = 0u64; let mut offline_ingestors = 0u64; - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { // send analytics for ingest servers // ingestor infos should be valid here, if not some thing is wrong diff --git a/src/audit.rs b/src/audit.rs index 9016efc30..766413bab 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -47,12 +47,7 @@ impl AuditLogger { // Try to construct the log endpoint URL by joining the base URL // with the ingest path, This can fail if the URL is not valid, // when the base URL is not set or the ingest path is not valid - let log_endpoint = match CONFIG - .parseable - .audit_logger - .as_ref()? - .join("/api/v1/ingest") - { + let log_endpoint = match CONFIG.options.audit_logger.as_ref()?.join("/api/v1/ingest") { Ok(url) => url, Err(err) => { eprintln!("Couldn't setup audit logger: {err}"); @@ -71,8 +66,8 @@ impl AuditLogger { .header("x-p-stream", "audit_log"); // Use basic auth if credentials are configured - if let Some(username) = CONFIG.parseable.audit_username.as_ref() { - req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) + if let Some(username) = CONFIG.options.audit_username.as_ref() { + req = req.basic_auth(username, CONFIG.options.audit_password.as_ref()) } match req.send().await { diff --git a/src/banner.rs b/src/banner.rs index a115cc5f9..00ebb7e81 100644 --- a/src/banner.rs +++ b/src/banner.rs @@ -25,7 +25,7 @@ use crate::{option::Config, storage::StorageMetadata}; pub async fn print(config: &Config, meta: &StorageMetadata) { print_ascii_art(); - let scheme = config.parseable.get_scheme(); + let scheme = config.options.get_scheme(); status_info(config, &scheme, meta.deployment_id); storage_info(config).await; about::print(config, meta).await; @@ -50,10 +50,10 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { let address = format!( "\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)", scheme, - config.parseable.address, + config.options.address, scheme.to_ascii_uppercase(), - config.parseable.grpc_port, - config.parseable.flight_port + config.options.grpc_port, + config.options.flight_port ); let mut credentials = @@ -63,7 +63,7 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { credentials = "\"Using default creds admin, admin. Please set credentials with P_USERNAME and P_PASSWORD.\"".red().to_string(); } - let llm_status = match &config.parseable.open_ai_key { + let llm_status = match &config.options.open_ai_key { Some(_) => "OpenAI Configured".green(), None => "Not Configured".grey(), }; @@ -107,7 +107,7 @@ async fn storage_info(config: &Config) { config.staging_dir().to_string_lossy(), ); - if let Some(path) = &config.parseable.hot_tier_storage_path { + if let Some(path) = &config.options.hot_tier_storage_path { eprintln!( "\ {:8}Hot Tier: \"Enabled, Path: {}\"", diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 8545841e0..0a07855f3 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -335,7 +335,7 @@ pub async fn remove_manifest_from_snapshot( STREAM_INFO.set_first_event_at(stream_name, None)?; storage.put_snapshot(stream_name, meta.snapshot).await?; } - match CONFIG.parseable.mode { + match CONFIG.options.mode { Mode::All | Mode::Ingest => { Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?) } @@ -349,7 +349,7 @@ pub async fn get_first_event( dates: Vec, ) -> Result, ObjectStorageError> { let mut first_event_at: String = String::default(); - match CONFIG.parseable.mode { + match CONFIG.options.mode { Mode::All | Mode::Ingest => { // get current snapshot let stream_first_event = STREAM_INFO.get_first_event(stream_name)?; diff --git a/src/cli.rs b/src/cli.rs index c1061cd65..6770c17a7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -16,14 +16,14 @@ * */ -use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches}; +use clap::Parser; use std::path::PathBuf; use url::Url; use crate::{ oidc::{self, OpenidConfig}, - option::{validation, Compression, Mode}, + option::{validation, Compression, Mode}, storage::{AzureBlobConfig, FSConfig, S3Config}, }; #[cfg(any( @@ -38,573 +38,374 @@ use crate::kafka::SslProtocol as KafkaSslProtocol; )))] use std::string::String as KafkaSslProtocol; -#[derive(Debug, Default)] +/// Default username and password for Parseable server, used by default for local mode. +/// NOTE: obviously not recommended for production +pub const DEFAULT_USERNAME: &str = "admin"; +pub const DEFAULT_PASSWORD: &str = "admin"; + + +#[derive(Parser)] +#[command( + name = "parseable", + bin_name = "parseable", + about = "Cloud Native, log analytics platform for modern applications.", + long_about = r#" +Cloud Native, log analytics platform for modern applications. + +Usage: +parseable [command] [options..] + + +Help: +parseable [command] --help + +"#, + arg_required_else_help = true, + color = clap::ColorChoice::Always, + version = env!("CARGO_PKG_VERSION"), + propagate_version = true, + next_line_help = false, + help_template = r#"{name} v{version} +{about} +Join the community at https://logg.ing/community. + +{all-args} + "#, + subcommand_required = true, +)] pub struct Cli { - /// The location of TLS Cert file - pub tls_cert_path: Option, - - /// The location of TLS Private Key file - pub tls_key_path: Option, + #[command(subcommand)] + pub storage: StorageOptions, +} - /// The location of other certificates to accept - pub trusted_ca_certs_path: Option, +#[derive(Parser)] +pub enum StorageOptions { + #[command(name = "local-store")] + Local(LocalStoreArgs), + + #[command(name = "s3-store")] + S3(S3StoreArgs), + + #[command(name = "blob-store")] + Blob(BlobStoreArgs), +} - /// The address on which the http server will listen. - pub address: String, +#[derive(Parser)] +pub struct LocalStoreArgs { + #[command(flatten)] + pub options: Options, + #[command(flatten)] + pub storage: FSConfig, +} - /// Base domain under which server is hosted. - /// This information is used by OIDC to refer redirects - pub domain_address: Option, +#[derive(Parser)] +pub struct S3StoreArgs { + #[command(flatten)] + pub options: Options, + #[command(flatten)] + pub storage: S3Config, +} - /// The local staging path is used as a temporary landing point - /// for incoming events - pub local_staging_path: PathBuf, +#[derive(Parser)] +pub struct BlobStoreArgs { + #[command(flatten)] + pub options: Options, + #[command(flatten)] + pub storage: AzureBlobConfig, +} - /// Username for the basic authentication on the server +#[derive(Parser, Debug)] +pub struct Options { + // Authentication + #[arg(long, env = "P_USERNAME", help = "Admin username to be set for this Parseable server", default_value = DEFAULT_USERNAME)] pub username: String, - /// Password for the basic authentication on the server + #[arg(long, env = "P_PASSWORD", help = "Admin password to be set for this Parseable server", default_value = DEFAULT_PASSWORD)] pub password: String, - /// OpenId configuration - pub openid: Option, + // Server configuration + #[arg( + long, + env = "P_ADDR", + default_value = "0.0.0.0:8000", + value_parser = validation::socket_addr, + help = "Address and port for Parseable HTTP(s) server" + )] + pub address: String, + + #[arg( + long = "origin", + env = "P_ORIGIN_URI", + value_parser = validation::url, + help = "Parseable server global domain address" + )] + pub domain_address: Option, + + #[arg( + long, + env = "P_MODE", + default_value = "all", + value_parser = validation::mode, + help = "Mode of operation" + )] + pub mode: Mode, - /// Server should check for update or not + #[arg( + long, + env = "P_CORS", + default_value = "true", + help = "Enable/Disable CORS, default disabled" + )] + pub cors: bool, + + #[arg( + long, + env = "P_CHECK_UPDATE", + default_value = "true", + help = "Enable/Disable checking for new Parseable release" + )] pub check_update: bool, - /// Server should send anonymous analytics or not + #[arg( + long, + env = "P_SEND_ANONYMOUS_USAGE_DATA", + default_value = "true", + help = "Enable/Disable anonymous telemetry data collection" + )] pub send_analytics: bool, - /// Open AI access key - pub open_ai_key: Option, - - /// Livetail port - pub grpc_port: u16, + // TLS/Security + #[arg( + long, + env = "P_TLS_CERT_PATH", + value_parser = validation::file_path, + help = "Local path on this device where certificate file is located. Required to enable TLS" + )] + pub tls_cert_path: Option, - /// Livetail channel capacity - pub livetail_channel_capacity: usize, + #[arg( + long, + env = "P_TLS_KEY_PATH", + value_parser = validation::file_path, + help = "Local path on this device where private key file is located. Required to enable TLS" + )] + pub tls_key_path: Option, - /// Rows in Parquet Rowgroup - pub row_group_size: usize, + #[arg( + long, + env = "P_TRUSTED_CA_CERTS_DIR", + value_parser = validation::canonicalize_path, + help = "Local path on this device where all trusted certificates are located" + )] + pub trusted_ca_certs_path: Option, - /// Query memory limit in bytes - pub query_memory_pool_size: Option, + // Storage configuration + #[arg( + long, + env = "P_STAGING_DIR", + default_value = "./staging", + value_parser = validation::canonicalize_path, + help = "Local path on this device to be used as landing point for incoming events" + )] + pub local_staging_path: PathBuf, - /// Parquet compression algorithm - pub parquet_compression: Compression, + #[arg( + long = "hot-tier-path", + env = "P_HOT_TIER_DIR", + value_parser = validation::canonicalize_path, + help = "Local path on this device to be used for hot tier data" + )] + pub hot_tier_storage_path: Option, - /// Mode of operation - pub mode: Mode, + #[arg( + long, + env = "P_MAX_DISK_USAGE_PERCENT", + default_value = "80.0", + value_parser = validation::validate_disk_usage, + help = "Maximum allowed disk usage in percentage e.g 90.0 for 90%" + )] + pub max_disk_usage: f64, - /// public address for the parseable server ingestor - pub ingestor_endpoint: String, + // Service ports + #[arg( + long, + env = "P_GRPC_PORT", + default_value = "8001", + help = "Port for gRPC server" + )] + pub grpc_port: u16, - /// port use by airplane(flight query service) + #[arg( + long, + env = "P_FLIGHT_PORT", + default_value = "8002", + help = "Port for Arrow Flight Querying Engine" + )] pub flight_port: u16, - /// CORS behaviour - pub cors: bool, + // Performance settings + #[arg( + long, + long = "livetail-capacity", + env = "P_LIVETAIL_CAPACITY", + default_value = "1000", + help = "Number of rows in livetail channel" + )] + pub livetail_channel_capacity: usize, - /// The local hot_tier path is used for optimising the query performance in the distributed systems - pub hot_tier_storage_path: Option, + #[arg( + long, + long = "query-mempool-size", + env = "P_QUERY_MEMORY_LIMIT", + help = "Set a fixed memory limit for query in GiB" + )] + pub query_memory_pool_size: Option, - ///maximum disk usage allowed - pub max_disk_usage: f64, + #[arg( + long, + env = "P_PARQUET_ROW_GROUP_SIZE", + default_value = "1048576", + help = "Number of rows in a row group" + )] + pub row_group_size: usize, - pub ms_clarity_tag: Option, + #[arg( + long = "compression-algo", + env = "P_PARQUET_COMPRESSION_ALGO", + default_value = "lz4", + value_parser = validation::compression, + help = "Parquet compression algorithm" + )] + pub parquet_compression: Compression, + + // Integration features + #[arg( + long, + env = "P_OPENAI_API_KEY", + help = "OpenAI key to enable llm features" + )] + pub open_ai_key: Option, - // Kafka specific env vars + #[arg( + long, + env = "P_INGESTOR_ENDPOINT", + default_value = "", + help = "URL to connect to this specific ingestor. Default is the address of the server" + )] + pub ingestor_endpoint: String, + + // OIDC Configuration + #[arg( + long, + long = "oidc-client", + env = "P_OIDC_CLIENT_ID", + requires = "oidc", + group = "oidc", + help = "Client id for OIDC provider" + )] + oidc_client_id: Option, + + #[arg( + long, + env = "P_OIDC_CLIENT_SECRET", + requires = "oidc", + group = "oidc", + help = "Client secret for OIDC provider" + )] + oidc_client_secret: Option, + + #[arg( + long, + env = "P_OIDC_ISSUER", + value_parser = validation::url, + requires = "oidc", + group = "oidc", + help = "OIDC provider's host address" + )] + oidc_issuer: Option, + + // Kafka configuration (conditionally compiled) + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + #[arg(long, env = "P_KAFKA_TOPICS", help = "Kafka topics to subscribe to")] pub kafka_topics: Option, + + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + #[arg(long, env = "P_KAFKA_HOST", help = "Address and port for Kafka server")] pub kafka_host: Option, + + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + #[arg(long, env = "P_KAFKA_GROUP", help = "Kafka group")] pub kafka_group: Option, + + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + #[arg(long, env = "P_KAFKA_CLIENT_ID", help = "Kafka client id")] pub kafka_client_id: Option, + + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + #[arg( + long, + env = "P_KAFKA_SECURITY_PROTOCOL", + value_parser = validation::kafka_security_protocol, + help = "Kafka security protocol" + )] pub kafka_security_protocol: Option, + + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + #[arg(long, env = "P_KAFKA_PARTITIONS", help = "Kafka partitions")] pub kafka_partitions: Option, - // Audit Logging env vars + // Audit logging + #[arg( + long, + env = "P_AUDIT_LOGGER", + value_parser = validation::url, + help = "Audit logger endpoint" + )] pub audit_logger: Option, + + #[arg(long ,env = "P_AUDIT_USERNAME", help = "Audit logger username")] pub audit_username: Option, + + #[arg(long ,env = "P_AUDIT_PASSWORD", help = "Audit logger password")] pub audit_password: Option, -} -impl Cli { - // identifiers for arguments - pub const TLS_CERT: &'static str = "tls-cert-path"; - pub const TLS_KEY: &'static str = "tls-key-path"; - pub const TRUSTED_CA_CERTS_PATH: &'static str = "trusted-ca-certs-path"; - pub const ADDRESS: &'static str = "address"; - pub const DOMAIN_URI: &'static str = "origin"; - pub const STAGING: &'static str = "local-staging-path"; - pub const USERNAME: &'static str = "username"; - pub const PASSWORD: &'static str = "password"; - pub const CHECK_UPDATE: &'static str = "check-update"; - pub const SEND_ANALYTICS: &'static str = "send-analytics"; - pub const OPEN_AI_KEY: &'static str = "open-ai-key"; - pub const OPENID_CLIENT_ID: &'static str = "oidc-client"; - pub const OPENID_CLIENT_SECRET: &'static str = "oidc-client-secret"; - pub const OPENID_ISSUER: &'static str = "oidc-issuer"; - pub const GRPC_PORT: &'static str = "grpc-port"; - pub const LIVETAIL_CAPACITY: &'static str = "livetail-capacity"; - // todo : what should this flag be - pub const QUERY_MEM_POOL_SIZE: &'static str = "query-mempool-size"; - pub const ROW_GROUP_SIZE: &'static str = "row-group-size"; - pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; - pub const MODE: &'static str = "mode"; - pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; - pub const DEFAULT_USERNAME: &'static str = "admin"; - pub const DEFAULT_PASSWORD: &'static str = "admin"; - pub const FLIGHT_PORT: &'static str = "flight-port"; - pub const CORS: &'static str = "cors"; - pub const HOT_TIER_PATH: &'static str = "hot-tier-path"; - pub const MAX_DISK_USAGE: &'static str = "max-disk-usage"; - pub const MS_CLARITY_TAG: &'static str = "ms-clarity-tag"; - - // Kafka specific env vars - pub const KAFKA_TOPICS: &'static str = "kafka-topics"; - pub const KAFKA_HOST: &'static str = "kafka-host"; - pub const KAFKA_GROUP: &'static str = "kafka-group"; - pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id"; - pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol"; - pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions"; - - pub const AUDIT_LOGGER: &'static str = "audit-logger"; - pub const AUDIT_USERNAME: &'static str = "audit-username"; - pub const AUDIT_PASSWORD: &'static str = "audit-password"; + #[arg(long ,env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] + pub ms_clarity_tag: Option, +} +impl Options { pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } pub fn get_scheme(&self) -> String { if self.tls_cert_path.is_some() && self.tls_key_path.is_some() { - return "https".to_string(); + "https".to_string() + } else { + "http".to_string() } - "http".to_string() } - pub fn create_cli_command_with_clap(name: &'static str) -> Command { - Command::new(name).next_line_help(false) - .arg( - Arg::new(Self::KAFKA_TOPICS) - .long(Self::KAFKA_TOPICS) - .env("P_KAFKA_TOPICS") - .value_name("STRING") - .help("Kafka topics to subscribe to"), - ) - .arg( - Arg::new(Self::KAFKA_HOST) - .long(Self::KAFKA_HOST) - .env("P_KAFKA_HOST") - .value_name("STRING") - .help("Address and port for Kafka server"), - ) - .arg( - Arg::new(Self::KAFKA_GROUP) - .long(Self::KAFKA_GROUP) - .env("P_KAFKA_GROUP") - .value_name("STRING") - .help("Kafka group"), - ) - .arg( - Arg::new(Self::KAFKA_CLIENT_ID) - .long(Self::KAFKA_CLIENT_ID) - .env("P_KAFKA_CLIENT_ID") - .value_name("STRING") - .help("Kafka client id"), - ) - .arg( - Arg::new(Self::KAFKA_SECURITY_PROTOCOL) - .long(Self::KAFKA_SECURITY_PROTOCOL) - .env("P_KAFKA_SECURITY_PROTOCOL") - .value_name("STRING") - .help("Kafka security protocol"), - ) - .arg( - Arg::new(Self::KAFKA_PARTITIONS) - .long(Self::KAFKA_PARTITIONS) - .env("P_KAFKA_PARTITIONS") - .value_name("STRING") - .help("Kafka partitions"), - ) - .arg( - Arg::new(Self::AUDIT_LOGGER) - .long(Self::AUDIT_LOGGER) - .env("P_AUDIT_LOGGER") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("Audit logger endpoint"), - ) - .arg( - Arg::new(Self::AUDIT_USERNAME) - .long(Self::AUDIT_USERNAME) - .env("P_AUDIT_USERNAME") - .value_name("STRING") - .help("Audit logger username"), - ) - .arg( - Arg::new(Self::AUDIT_PASSWORD) - .long(Self::AUDIT_PASSWORD) - .env("P_AUDIT_PASSWORD") - .value_name("STRING") - .help("Audit logger password"), - ) - .arg( - Arg::new(Self::TLS_CERT) - .long(Self::TLS_CERT) - .env("P_TLS_CERT_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where certificate file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::TLS_KEY) - .long(Self::TLS_KEY) - .env("P_TLS_KEY_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where private key file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::TRUSTED_CA_CERTS_PATH) - .long(Self::TRUSTED_CA_CERTS_PATH) - .env("P_TRUSTED_CA_CERTS_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device where all trusted certificates are located.") - ) - .arg( - Arg::new(Self::ADDRESS) - .long(Self::ADDRESS) - .env("P_ADDR") - .value_name("ADDR:PORT") - .default_value("0.0.0.0:8000") - .value_parser(validation::socket_addr) - .help("Address and port for Parseable HTTP(s) server"), - ) - .arg( - Arg::new(Self::STAGING) - .long(Self::STAGING) - .env("P_STAGING_DIR") - .value_name("DIR") - .default_value("./staging") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used as landing point for incoming events") - .next_line_help(true), - ) - .arg( - Arg::new(Self::USERNAME) - .long(Self::USERNAME) - .env("P_USERNAME") - .value_name("STRING") - .required(true) - .help("Admin username to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::PASSWORD) - .long(Self::PASSWORD) - .env("P_PASSWORD") - .value_name("STRING") - .required(true) - .help("Admin password to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::CHECK_UPDATE) - .long(Self::CHECK_UPDATE) - .env("P_CHECK_UPDATE") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable checking for new Parseable release"), - ) - .arg( - Arg::new(Self::SEND_ANALYTICS) - .long(Self::SEND_ANALYTICS) - .env("P_SEND_ANONYMOUS_USAGE_DATA") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable anonymous telemetry data collection"), - ) - .arg( - Arg::new(Self::OPEN_AI_KEY) - .long(Self::OPEN_AI_KEY) - .env("P_OPENAI_API_KEY") - .value_name("STRING") - .required(false) - .help("OpenAI key to enable llm features"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_ID) - .long(Self::OPENID_CLIENT_ID) - .env("P_OIDC_CLIENT_ID") - .value_name("STRING") - .required(false) - .help("Client id for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_SECRET) - .long(Self::OPENID_CLIENT_SECRET) - .env("P_OIDC_CLIENT_SECRET") - .value_name("STRING") - .required(false) - .help("Client secret for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_ISSUER) - .long(Self::OPENID_ISSUER) - .env("P_OIDC_ISSUER") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("OIDC provider's host address"), - ) - .arg( - Arg::new(Self::DOMAIN_URI) - .long(Self::DOMAIN_URI) - .env("P_ORIGIN_URI") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("Parseable server global domain address"), - ) - .arg( - Arg::new(Self::GRPC_PORT) - .long(Self::GRPC_PORT) - .env("P_GRPC_PORT") - .value_name("PORT") - .default_value("8001") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for gRPC server"), - ) - .arg( - Arg::new(Self::FLIGHT_PORT) - .long(Self::FLIGHT_PORT) - .env("P_FLIGHT_PORT") - .value_name("PORT") - .default_value("8002") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for Arrow Flight Querying Engine"), - ) - .arg( - Arg::new(Self::CORS) - .long(Self::CORS) - .env("P_CORS") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable CORS, default disabled"), - ) - .arg( - Arg::new(Self::LIVETAIL_CAPACITY) - .long(Self::LIVETAIL_CAPACITY) - .env("P_LIVETAIL_CAPACITY") - .value_name("NUMBER") - .default_value("1000") - .required(false) - .value_parser(value_parser!(usize)) - .help("Number of rows in livetail channel"), - ) - .arg( - Arg::new(Self::QUERY_MEM_POOL_SIZE) - .long(Self::QUERY_MEM_POOL_SIZE) - .env("P_QUERY_MEMORY_LIMIT") - .value_name("Gib") - .required(false) - .value_parser(value_parser!(u8)) - .help("Set a fixed memory limit for query"), - ) - .arg( - // RowGroupSize controls the number of rows present in one row group - // More rows = better compression but HIGHER Memory consumption during read/write - // 1048576 is the default value for DataFusion - Arg::new(Self::ROW_GROUP_SIZE) - .long(Self::ROW_GROUP_SIZE) - .env("P_PARQUET_ROW_GROUP_SIZE") - .value_name("NUMBER") - .required(false) - .default_value("1048576") - .value_parser(value_parser!(usize)) - .help("Number of rows in a row group"), - ).arg( - Arg::new(Self::MODE) - .long(Self::MODE) - .env("P_MODE") - .value_name("STRING") - .required(false) - .default_value("all") - .value_parser([ - "query", - "ingest", - "all"]) - .help("Mode of operation"), - ) - .arg( - Arg::new(Self::INGESTOR_ENDPOINT) - .long(Self::INGESTOR_ENDPOINT) - .env("P_INGESTOR_ENDPOINT") - .value_name("URL") - .required(false) - .help("URL to connect to this specific ingestor. Default is the address of the server.") - ) - .arg( - Arg::new(Self::PARQUET_COMPRESSION_ALGO) - .long(Self::PARQUET_COMPRESSION_ALGO) - .env("P_PARQUET_COMPRESSION_ALGO") - .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") - .required(false) - .default_value("lz4") - .value_parser([ - "uncompressed", - "snappy", - "gzip", - "lzo", - "brotli", - "lz4", - "zstd"]) - .help("Parquet compression algorithm"), - ) - .arg( - Arg::new(Self::HOT_TIER_PATH) - .long(Self::HOT_TIER_PATH) - .env("P_HOT_TIER_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for hot tier data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::MAX_DISK_USAGE) - .long(Self::MAX_DISK_USAGE) - .env("P_MAX_DISK_USAGE_PERCENT") - .value_name("percentage") - .default_value("80.0") - .value_parser(validation::validate_disk_usage) - .help("Maximum allowed disk usage in percentage e.g 90.0 for 90%") - .next_line_help(true), - ) - .arg( - Arg::new(Self::MS_CLARITY_TAG) - .long(Self::MS_CLARITY_TAG) - .env("P_MS_CLARITY_TAG") - .value_name("STRING") - .required(false) - .help("Tag for MS Clarity"), - ) - .group( - ArgGroup::new("oidc") - .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .multiple(true) - ) - } -} - -impl FromArgMatches for Cli { - fn from_arg_matches(m: &clap::ArgMatches) -> Result { - let mut s: Self = Self::default(); - s.update_from_arg_matches(m)?; - Ok(s) - } - - fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { - #[cfg(any( - all(target_os = "linux", target_arch = "x86_64"), - all(target_os = "macos", target_arch = "aarch64") - ))] - { - self.kafka_topics = m.get_one::(Self::KAFKA_TOPICS).cloned(); - self.kafka_security_protocol = m - .get_one::(Self::KAFKA_SECURITY_PROTOCOL) - .cloned(); - self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); - self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); - self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); - } - - self.audit_logger = m.get_one::(Self::AUDIT_LOGGER).cloned(); - self.audit_username = m.get_one::(Self::AUDIT_USERNAME).cloned(); - self.audit_password = m.get_one::(Self::AUDIT_PASSWORD).cloned(); - - self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); - self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); - self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); - self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); - - self.address = m - .get_one::(Self::ADDRESS) - .cloned() - .expect("default value for address"); - - self.ingestor_endpoint = m - .get_one::(Self::INGESTOR_ENDPOINT) - .cloned() - .unwrap_or_else(String::default); - - self.local_staging_path = m - .get_one::(Self::STAGING) - .cloned() - .expect("default value for staging"); - self.username = m - .get_one::(Self::USERNAME) - .cloned() - .expect("default for username"); - self.password = m - .get_one::(Self::PASSWORD) - .cloned() - .expect("default for password"); - self.check_update = m - .get_one::(Self::CHECK_UPDATE) - .cloned() - .expect("default for check update"); - self.send_analytics = m - .get_one::(Self::SEND_ANALYTICS) - .cloned() - .expect("default for send analytics"); - self.open_ai_key = m.get_one::(Self::OPEN_AI_KEY).cloned(); - self.grpc_port = m - .get_one::(Self::GRPC_PORT) - .cloned() - .expect("default for livetail port"); - self.flight_port = m - .get_one::(Self::FLIGHT_PORT) - .cloned() - .expect("default for flight port"); - self.cors = m - .get_one::(Self::CORS) - .cloned() - .expect("default for CORS"); - self.livetail_channel_capacity = m - .get_one::(Self::LIVETAIL_CAPACITY) - .cloned() - .expect("default for livetail capacity"); - // converts Gib to bytes before assigning - self.query_memory_pool_size = m - .get_one::(Self::QUERY_MEM_POOL_SIZE) - .cloned() - .map(|gib| gib as usize * 1024usize.pow(3)); - self.row_group_size = m - .get_one::(Self::ROW_GROUP_SIZE) - .cloned() - .expect("default for row_group size"); - self.parquet_compression = serde_json::from_str(&format!( - "{:?}", - m.get_one::(Self::PARQUET_COMPRESSION_ALGO) - .expect("default for compression algo") - )) - .expect("unexpected compression algo"); - - let openid_client_id = m.get_one::(Self::OPENID_CLIENT_ID).cloned(); - let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); - let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); - - self.openid = match (openid_client_id, openid_client_secret, openid_issuer) { + pub fn openid(&self) -> Option { + match (&self.oidc_client_id, &self.oidc_client_secret, &self.oidc_issuer) { (Some(id), Some(secret), Some(issuer)) => { let origin = if let Some(url) = self.domain_address.clone() { oidc::Origin::Production(url) @@ -615,34 +416,13 @@ impl FromArgMatches for Cli { } }; Some(OpenidConfig { - id, - secret, - issuer, + id: id.clone(), + secret: secret.clone(), + issuer: issuer.clone(), origin, }) } _ => None, - }; - - self.mode = match m - .get_one::(Self::MODE) - .expect("Mode not set") - .as_str() - { - "query" => Mode::Query, - "ingest" => Mode::Ingest, - "all" => Mode::All, - _ => unreachable!(), - }; - - self.hot_tier_storage_path = m.get_one::(Self::HOT_TIER_PATH).cloned(); - self.max_disk_usage = m - .get_one::(Self::MAX_DISK_USAGE) - .cloned() - .expect("default for max disk usage"); - - self.ms_clarity_tag = m.get_one::(Self::MS_CLARITY_TAG).cloned(); - - Ok(()) + } } } diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs index af31fc54f..9efbc3fcc 100644 --- a/src/event/writer/mod.rs +++ b/src/event/writer/mod.rs @@ -138,7 +138,7 @@ impl WriterTable { custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { + if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { stream_writer.lock().unwrap().push( stream_name, schema_key, @@ -169,7 +169,7 @@ impl WriterTable { ) -> Result<(), StreamWriterError> { match map.get(stream_name) { Some(writer) => { - if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { + if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { writer.lock().unwrap().push( stream_name, schema_key, @@ -182,7 +182,7 @@ impl WriterTable { } } None => { - if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal { + if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { let mut writer = Writer::default(); writer.push( stream_name, diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 899157df9..5edfdba21 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -284,12 +284,12 @@ impl FlightService for AirServiceImpl { pub fn server() -> impl Future>> + Send { let mut addr: SocketAddr = CONFIG - .parseable + .options .address .parse() .unwrap_or_else(|err| panic!("{}, failed to parse `{}` as a socket address. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", -CONFIG.parseable.address, err)); - addr.set_port(CONFIG.parseable.flight_port); +CONFIG.options.address, err)); + addr.set_port(CONFIG.options.flight_port); let service = AirServiceImpl {}; @@ -301,10 +301,7 @@ CONFIG.parseable.address, err)); let cors = cross_origin_config(); - let identity = match ( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - ) { + let identity = match (&CONFIG.options.tls_cert_path, &CONFIG.options.tls_key_path) { (Some(cert), Some(key)) => { match (std::fs::read_to_string(cert), std::fs::read_to_string(key)) { (Ok(cert_file), Ok(key_file)) => { diff --git a/src/handlers/http/about.rs b/src/handlers/http/about.rs index 79f9c22b2..9e277b797 100644 --- a/src/handlers/http/about.rs +++ b/src/handlers/http/about.rs @@ -62,17 +62,17 @@ pub async fn about() -> Json { let commit = current_release.commit_hash; let deployment_id = meta.deployment_id.to_string(); let mode = CONFIG.get_server_mode_string(); - let staging = if CONFIG.parseable.mode == Mode::Query { + let staging = if CONFIG.options.mode == Mode::Query { "".to_string() } else { CONFIG.staging_dir().display().to_string() }; - let grpc_port = CONFIG.parseable.grpc_port; + let grpc_port = CONFIG.options.grpc_port; let store_endpoint = CONFIG.storage().get_endpoint(); - let is_llm_active = &CONFIG.parseable.open_ai_key.is_some(); + let is_llm_active = &CONFIG.options.open_ai_key.is_some(); let llm_provider = is_llm_active.then_some("OpenAI"); - let is_oidc_active = CONFIG.parseable.openid.is_some(); + let is_oidc_active = CONFIG.options.openid().is_some(); let ui_version = option_env!("UI_VERSION").unwrap_or("development"); let hot_tier_details: String = if CONFIG.hot_tier_dir().is_none() { @@ -85,7 +85,7 @@ pub async fn about() -> Json { ) }; - let ms_clarity_tag = &CONFIG.parseable.ms_clarity_tag; + let ms_clarity_tag = &CONFIG.options.ms_clarity_tag; Json(json!({ "version": current_version, diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6f3cb3f07..5fe2fc327 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -226,7 +226,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result {} Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())), @@ -274,7 +274,7 @@ pub async fn create_stream_if_not_exists( // For distributed deployments, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if CONFIG.parseable.mode != Mode::All + if CONFIG.options.mode != Mode::All && create_stream_and_schema_from_storage(stream_name).await? { return Ok(stream_exists); diff --git a/src/handlers/http/llm.rs b/src/handlers/http/llm.rs index bf1b2968a..611220e88 100644 --- a/src/handlers/http/llm.rs +++ b/src/handlers/http/llm.rs @@ -87,7 +87,7 @@ fn build_request_body(ai_prompt: String) -> impl serde::Serialize { } pub async fn make_llm_request(body: web::Json) -> Result { - let api_key = match &CONFIG.parseable.open_ai_key { + let api_key = match &CONFIG.options.open_ai_key { Some(api_key) if api_key.len() > 3 => api_key, _ => return Err(LLMError::InvalidAPIKey), }; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index cafe56190..58a6ba65c 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -138,7 +138,7 @@ pub async fn schema(req: HttpRequest) -> Result { match STREAM_INFO.schema(&stream_name) { Ok(_) => {} - Err(_) if CONFIG.parseable.mode == Mode::Query => { + Err(_) if CONFIG.options.mode == Mode::Query => { if !create_stream_and_schema_from_storage(&stream_name).await? { return Err(StreamError::StreamNotFound(stream_name.clone())); } @@ -223,7 +223,7 @@ pub async fn put_alert( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -271,7 +271,7 @@ pub async fn get_retention(req: HttpRequest) -> Result {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -304,7 +304,7 @@ pub async fn put_retention( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -368,7 +368,7 @@ pub async fn get_stats(req: HttpRequest) -> Result // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if cfg!(not(test)) && CONFIG.parseable.mode == Mode::Query { + if cfg!(not(test)) && CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -545,7 +545,7 @@ pub async fn create_stream( pub async fn get_stream_info(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -599,7 +599,7 @@ pub async fn put_stream_hot_tier( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -615,7 +615,7 @@ pub async fn put_stream_hot_tier( status: StatusCode::BAD_REQUEST, }); } - if CONFIG.parseable.hot_tier_storage_path.is_none() { + if CONFIG.options.hot_tier_storage_path.is_none() { return Err(StreamError::HotTierNotEnabled(stream_name)); } @@ -659,7 +659,7 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -669,7 +669,7 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result {} Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), @@ -704,7 +704,7 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result Self::Future { let username = req.match_info().get("username").unwrap_or(""); - let is_root = username == CONFIG.parseable.username; + let is_root = username == CONFIG.options.username; let fut = self.service.call(req); Box::pin(async move { @@ -300,7 +300,7 @@ where fn call(&self, req: ServiceRequest) -> Self::Future { let path = req.path(); - let mode = &CONFIG.parseable.mode; + let mode = &CONFIG.options.mode; // change error messages based on mode match mode { Mode::Query => { diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 8d8db14c9..00b7e50e8 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -56,7 +56,7 @@ pub fn metrics_path() -> String { } pub(crate) fn cross_origin_config() -> Cors { - if !CONFIG.parseable.cors || cfg!(feature = "debug") { + if !CONFIG.options.cors || cfg!(feature = "debug") { Cors::permissive().block_on_origin_mismatch(false) } else { Cors::default().block_on_origin_mismatch(false) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index d8b4ceefb..215f79478 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -355,7 +355,7 @@ async fn validate_credentials() -> anyhow::Result<()> { let token = base64::prelude::BASE64_STANDARD.encode(format!( "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password + CONFIG.options.username, CONFIG.options.password )); let token = format!("Basic {}", token); diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 311c94294..89fc7021e 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -92,9 +92,9 @@ pub trait ParseableServer { // get the ssl stuff let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, + &CONFIG.options.tls_cert_path, + &CONFIG.options.tls_key_path, + &CONFIG.options.trusted_ca_certs_path, )?; // fn that creates the app @@ -117,10 +117,10 @@ pub trait ParseableServer { // Start the server with or without TLS let srv = if let Some(config) = ssl { http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .bind_rustls_0_22(&CONFIG.options.address, config)? .run() } else { - http_server.bind(&CONFIG.parseable.address)?.run() + http_server.bind(&CONFIG.options.address)?.run() }; // Graceful shutdown handling diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index ef1412529..9ebca9358 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -105,7 +105,7 @@ impl ParseableServer for QueryServer { // all internal data structures populated now. // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { + if CONFIG.options.send_analytics { analytics::init_analytics_scheduler()?; } @@ -122,7 +122,7 @@ impl ParseableServer for QueryServer { sync::object_store_sync().await; tokio::spawn(airplane::server()); - let app = self.start(shutdown_rx, prometheus, CONFIG.parseable.openid.clone()); + let app = self.start(shutdown_rx, prometheus, CONFIG.options.openid()); tokio::pin!(app); loop { diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 4c37f1050..b0bde3250 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -118,14 +118,14 @@ impl ParseableServer for Server { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; - if CONFIG.parseable.send_analytics { + if CONFIG.options.send_analytics { analytics::init_analytics_scheduler()?; } tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); - let app = self.start(shutdown_rx, prometheus, CONFIG.parseable.openid.clone()); + let app = self.start(shutdown_rx, prometheus, CONFIG.options.openid()); tokio::pin!(app); diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 3543198d0..07c6963e6 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -60,7 +60,7 @@ pub async fn create_update_stream( } if !metadata::STREAM_INFO.stream_exists(stream_name) - && CONFIG.parseable.mode == Mode::Query + && CONFIG.options.mode == Mode::Query && create_stream_and_schema_from_storage(stream_name).await? { return Err(StreamError::Custom { diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index e997f1208..ba25f7c5b 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -197,7 +197,7 @@ pub async fn reply_login( let redirect_url = login_query .state .clone() - .unwrap_or_else(|| CONFIG.parseable.address.to_string()); + .unwrap_or_else(|| CONFIG.options.address.to_string()); Ok(redirect_to_client( &redirect_url, diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7cecb1416..c2a361dff 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -123,7 +123,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result) -> Result<(), QueryError> { - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { for table in tables { if let Ok(new_schema) = fetch_schema(table).await { // commit schema merges the schema internally and updates the schema in storage. diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index 646c982c5..ff968b811 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -173,11 +173,11 @@ impl FlightService for FlightServiceImpl { pub fn server() -> impl Future>> + Send { let mut addr: SocketAddr = CONFIG - .parseable + .options .address .parse() .expect("valid socket address"); - addr.set_port(CONFIG.parseable.grpc_port); + addr.set_port(CONFIG.options.grpc_port); let service = FlightServiceImpl {}; @@ -185,10 +185,7 @@ pub fn server() -> impl Future { match (std::fs::read_to_string(cert), std::fs::read_to_string(key)) { (Ok(cert_file), Ok(key_file)) => { diff --git a/src/hottier.rs b/src/hottier.rs index 32a7bd4c3..29039a1b4 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -72,7 +72,7 @@ impl HotTierManager { pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); - let hot_tier_path = &CONFIG.parseable.hot_tier_storage_path; + let hot_tier_path = &CONFIG.options.hot_tier_storage_path; if hot_tier_path.is_none() { return None; } @@ -136,7 +136,7 @@ impl HotTierManager { let (total_hot_tier_size, total_hot_tier_used_size) = self.get_hot_tiers_size(stream).await?; - let disk_threshold = (CONFIG.parseable.max_disk_usage * total_space as f64) / 100.0; + let disk_threshold = (CONFIG.options.max_disk_usage * total_space as f64) / 100.0; let max_allowed_hot_tier_size = disk_threshold - total_hot_tier_size as f64 - (used_space as f64 @@ -570,7 +570,7 @@ impl HotTierManager { 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { let file_size = file_to_delete.file_size; let path_to_delete = CONFIG - .parseable + .options .hot_tier_storage_path .as_ref() .unwrap() @@ -632,7 +632,7 @@ impl HotTierManager { } if ((used_space + size_to_download) as f64 * 100.0 / total_space as f64) - > CONFIG.parseable.max_disk_usage + > CONFIG.options.max_disk_usage { return Ok(false); } @@ -694,7 +694,7 @@ impl HotTierManager { } pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> { - if CONFIG.parseable.hot_tier_storage_path.is_some() + if CONFIG.options.hot_tier_storage_path.is_some() && !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) { let mut stream_hot_tier = StreamHotTier { @@ -735,7 +735,7 @@ struct DiskUtil { /// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in /// `/home/parseable`, we should return the usage stats of the disk mounted there. fn get_disk_usage() -> Option { - let path = CONFIG.parseable.hot_tier_storage_path.as_ref()?; + let path = CONFIG.options.hot_tier_storage_path.as_ref()?; let mut disks = Disks::new_with_refreshed_list(); // Order the disk partitions by decreasing length of mount path disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); diff --git a/src/kafka.rs b/src/kafka.rs index 45c7d5220..43b480d8d 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -92,18 +92,18 @@ pub enum KafkaError { } fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { - if let Some(topics) = &CONFIG.parseable.kafka_topics { + if let Some(topics) = &CONFIG.options.kafka_topics { // topics can be a comma separated list of topics to subscribe to let topics = topics.split(',').map(|v| v.to_owned()).collect_vec(); - let host = if CONFIG.parseable.kafka_host.is_some() { - CONFIG.parseable.kafka_host.as_ref() + let host = if CONFIG.options.kafka_host.is_some() { + CONFIG.options.kafka_host.as_ref() } else { return Err(KafkaError::NoVarError("P_KAKFA_HOST")); }; - let group = if CONFIG.parseable.kafka_group.is_some() { - CONFIG.parseable.kafka_group.as_ref() + let group = if CONFIG.options.kafka_group.is_some() { + CONFIG.options.kafka_group.as_ref() } else { return Err(KafkaError::NoVarError("P_KAKFA_GROUP")); }; @@ -112,18 +112,18 @@ fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { conf.set("bootstrap.servers", host.unwrap()); conf.set("group.id", group.unwrap()); - if let Some(val) = CONFIG.parseable.kafka_client_id.as_ref() { + if let Some(val) = CONFIG.options.kafka_client_id.as_ref() { conf.set("client.id", val); } - if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() { + if let Some(ssl_protocol) = CONFIG.options.kafka_security_protocol.as_ref() { conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?); } let consumer: StreamConsumer = conf.create()?; consumer.subscribe(&topics.iter().map(|v| v.as_str()).collect_vec())?; - if let Some(vals_raw) = CONFIG.parseable.kafka_partitions.as_ref() { + if let Some(vals_raw) = CONFIG.options.kafka_partitions.as_ref() { // partitions is a comma separated pairs of topic:partitions let mut topic_partition_pairs = Vec::new(); let mut set = true; @@ -239,7 +239,7 @@ pub async fn setup_integration() { while let Ok(curr) = stream.next().await.unwrap() { // TODO: maybe we should not constructs an audit log for each kafka message, but do so at the batch level let log_builder = AuditLogBuilder::default() - .with_host(CONFIG.parseable.kafka_host.as_deref().unwrap_or("")) + .with_host(CONFIG.options.kafka_host.as_deref().unwrap_or("")) .with_user_agent("Kafka Client") .with_protocol("Kafka") .with_stream(curr.topic()); diff --git a/src/main.rs b/src/main.rs index 93f25f5df..069306951 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> { .init(); // these are empty ptrs so mem footprint should be minimal - let server: Box = match CONFIG.parseable.mode { + let server: Box = match CONFIG.options.mode { Mode::Query => Box::new(QueryServer), Mode::Ingest => Box::new(IngestServer), Mode::All => Box::new(Server), @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> { all(target_os = "macos", target_arch = "aarch64") ))] // load kafka server - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.options.mode != Mode::Query { tokio::task::spawn(kafka::setup_integration()); } diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index 298bad2b0..4385c93b0 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -47,7 +47,7 @@ pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue { metadata.insert("users".to_string(), JsonValue::Array(vec![])); metadata.insert("streams".to_string(), JsonValue::Array(vec![])); metadata.insert("roles".to_string(), JsonValue::Array(vec![])); - metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); + metadata.insert("server_mode".to_string(), json!(CONFIG.options.mode)); storage_metadata } @@ -108,7 +108,7 @@ pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue { "roles".to_string(), JsonValue::Object(Map::from_iter(privileges_map)), ); - metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); + metadata.insert("server_mode".to_string(), json!(CONFIG.options.mode)); storage_metadata } @@ -119,7 +119,7 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { let sm = metadata.get("server_mode"); if sm.is_none() || sm.unwrap().as_str().unwrap() == "All" { - metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); + metadata.insert("server_mode".to_string(), json!(CONFIG.options.mode)); } let roles = metadata.get_mut("roles").unwrap().as_object_mut().unwrap(); @@ -146,17 +146,17 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { match metadata.get("server_mode") { None => { - metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); + metadata.insert("server_mode".to_string(), json!(CONFIG.options.mode)); } Some(JsonValue::String(mode)) => match mode.as_str() { "Query" => { metadata.insert( "querier_endpoint".to_string(), - JsonValue::String(CONFIG.parseable.address.clone()), + JsonValue::String(CONFIG.options.address.clone()), ); } "All" => { - metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); + metadata.insert("server_mode".to_string(), json!(CONFIG.options.mode)); } _ => (), }, @@ -191,7 +191,7 @@ pub async fn migrate_ingester_metadata() -> anyhow::Result anyhow::Result<()> { let streams = storage.list_streams().await?; for stream in streams { migration_stream(&stream.name, &*storage).await?; - if CONFIG.parseable.hot_tier_storage_path.is_some() { + if CONFIG.options.hot_tier_storage_path.is_some() { migration_hot_tier(&stream.name).await?; } } @@ -218,7 +218,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let mut stream_meta_found = true; if stream_metadata.is_empty() { - if CONFIG.parseable.mode != Mode::Ingest { + if CONFIG.options.mode != Mode::Ingest { return Ok(()); } stream_meta_found = false; diff --git a/src/option.rs b/src/option.rs index f0de887d1..e6c2d9200 100644 --- a/src/option.rs +++ b/src/option.rs @@ -16,18 +16,15 @@ * */ -use crate::cli::Cli; +use crate::cli::{Cli, Options, StorageOptions, DEFAULT_PASSWORD, DEFAULT_USERNAME}; use crate::storage::object_storage::parseable_json_path; -use crate::storage::{ - AzureBlobConfig, FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config, -}; +use crate::storage::{ObjectStorageError, ObjectStorageProvider}; use bytes::Bytes; use clap::error::ErrorKind; -use clap::{command, Args, Command, FromArgMatches}; +use clap::Parser; use once_cell::sync::Lazy; use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use serde::{Deserialize, Serialize}; -use std::env; use std::path::PathBuf; use std::sync::Arc; @@ -37,101 +34,47 @@ pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] pub struct Config { - pub parseable: Cli, + pub options: Options, storage: Arc, pub storage_name: &'static str, } impl Config { fn new() -> Self { - let cli = create_parseable_cli_command() - .name("Parseable") - .about( - r#" -Cloud Native, log analytics platform for modern applications. - -Usage: -parseable [command] [options..] - - -Help: -parseable [command] --help - -"#, - ) - .arg_required_else_help(true) - .subcommand_required(true) - .color(clap::ColorChoice::Always) - .get_matches(); - - match cli.subcommand() { - Some(("local-store", m)) => { - let cli = match Cli::from_arg_matches(m) { - Ok(cli) => cli, - Err(err) => err.exit(), - }; - let storage = match FSConfig::from_arg_matches(m) { - Ok(storage) => storage, - Err(err) => err.exit(), - }; - - if cli.local_staging_path == storage.root { - create_parseable_cli_command() - .error( - ErrorKind::ValueValidation, - "Cannot use same path for storage and staging", - ) - .exit() + match Cli::parse().storage { + StorageOptions::Local(args) => { + if args.options.local_staging_path == args.storage.root { + clap::Error::raw( + ErrorKind::ValueValidation, + "Cannot use same path for storage and staging", + ) + .exit(); } - if cli.hot_tier_storage_path.is_some() { - create_parseable_cli_command() - .error( - ErrorKind::ValueValidation, - "Cannot use hot tier with local-store subcommand.", - ) - .exit() + if args.options.hot_tier_storage_path.is_some() { + clap::Error::raw( + ErrorKind::ValueValidation, + "Cannot use hot tier with local-store subcommand.", + ) + .exit(); } Config { - parseable: cli, - storage: Arc::new(storage), + options: args.options, + storage: Arc::new(args.storage), storage_name: "drive", } } - Some(("s3-store", m)) => { - let cli = match Cli::from_arg_matches(m) { - Ok(cli) => cli, - Err(err) => err.exit(), - }; - let storage = match S3Config::from_arg_matches(m) { - Ok(storage) => storage, - Err(err) => err.exit(), - }; - - Config { - parseable: cli, - storage: Arc::new(storage), - storage_name: "s3", - } - } - Some(("blob-store", m)) => { - let cli = match Cli::from_arg_matches(m) { - Ok(cli) => cli, - Err(err) => err.exit(), - }; - let storage = match AzureBlobConfig::from_arg_matches(m) { - Ok(storage) => storage, - Err(err) => err.exit(), - }; - - Config { - parseable: cli, - storage: Arc::new(storage), - storage_name: "blob_store", - } - } - _ => unreachable!(), + StorageOptions::S3(args) => Config { + options: args.options, + storage: Arc::new(args.storage), + storage_name: "s3", + }, + StorageOptions::Blob(args) => Config { + options: args.options, + storage: Arc::new(args.storage), + storage_name: "blob_store", + }, } } @@ -174,16 +117,15 @@ parseable [command] --help } pub fn staging_dir(&self) -> &PathBuf { - &self.parseable.local_staging_path + &self.options.local_staging_path } pub fn hot_tier_dir(&self) -> &Option { - &self.parseable.hot_tier_storage_path + &self.options.hot_tier_storage_path } pub fn is_default_creds(&self) -> bool { - self.parseable.username == Cli::DEFAULT_USERNAME - && self.parseable.password == Cli::DEFAULT_PASSWORD + self.options.username == DEFAULT_USERNAME && self.options.password == DEFAULT_PASSWORD } // returns the string representation of the storage mode @@ -202,7 +144,7 @@ parseable [command] --help } pub fn get_server_mode_string(&self) -> &str { - match self.parseable.mode { + match self.options.mode { Mode::Query => "Distributed (Query)", Mode::Ingest => "Distributed (Ingest)", Mode::All => "Standalone", @@ -210,40 +152,6 @@ parseable [command] --help } } -fn create_parseable_cli_command() -> Command { - let local = Cli::create_cli_command_with_clap("local-store"); - let local = ::augment_args_for_update(local); - - let local = local - .mut_arg(Cli::USERNAME, |arg| { - arg.required(false).default_value(Cli::DEFAULT_USERNAME) - }) - .mut_arg(Cli::PASSWORD, |arg| { - arg.required(false).default_value(Cli::DEFAULT_PASSWORD) - }); - let s3 = Cli::create_cli_command_with_clap("s3-store"); - let s3 = ::augment_args_for_update(s3); - - let azureblob = Cli::create_cli_command_with_clap("blob-store"); - let azureblob = ::augment_args_for_update(azureblob); - - command!() - .name("Parseable") - .bin_name("parseable") - .propagate_version(true) - .next_line_help(false) - .help_template( - r#"{name} v{version} -{about} -Join the community at https://logg.ing/community. - -{all-args} - "#, - ) - .subcommand_required(true) - .subcommands([local, s3, azureblob]) -} - #[derive(Debug, Default, Eq, PartialEq, Clone, Copy, Serialize, Deserialize)] pub enum Mode { Query, @@ -291,6 +199,14 @@ pub mod validation { use human_size::{multiples, SpecificSize}; + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + use crate::kafka::SslProtocol; + + use super::{Compression, Mode}; + pub fn file_path(s: &str) -> Result { if s.is_empty() { return Err("empty path".to_owned()); @@ -333,6 +249,42 @@ pub mod validation { url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) } + #[cfg(any( + all(target_os = "linux", target_arch = "x86_64"), + all(target_os = "macos", target_arch = "aarch64") + ))] + pub fn kafka_security_protocol(s: &str) -> Result { + match s { + "plaintext" => Ok(SslProtocol::Plaintext), + "ssl" => Ok(SslProtocol::Ssl), + "sasl_plaintext" => Ok(SslProtocol::SaslPlaintext), + "sasl_ssl" => Ok(SslProtocol::SaslSsl), + _ => Err("Invalid Kafka Security Protocol provided".to_string()), + } + } + + pub fn mode(s: &str) -> Result { + match s { + "query" => Ok(Mode::Query), + "ingest" => Ok(Mode::Ingest), + "all" => Ok(Mode::All), + _ => Err("Invalid MODE provided".to_string()), + } + } + + pub fn compression(s: &str) -> Result { + match s { + "uncompressed" => Ok(Compression::Uncompressed), + "snappy" => Ok(Compression::Snappy), + "gzip" => Ok(Compression::Gzip), + "lzo" => Ok(Compression::Lzo), + "brotli" => Ok(Compression::Brotli), + "lz4" => Ok(Compression::Lz4), + "zstd" => Ok(Compression::Zstd), + _ => Err("Invalid COMPRESSION provided".to_string()), + } + } + pub fn human_size_to_bytes(s: &str) -> Result { fn parse_and_map( s: &str, diff --git a/src/query/mod.rs b/src/query/mod.rs index b86573d27..1542b1455 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -65,7 +65,7 @@ impl Query { .get_datafusion_runtime() .with_disk_manager(DiskManagerConfig::NewOs); - let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size { + let (pool_size, fraction) = match CONFIG.options.query_memory_pool_size { Some(size) => (size, 1.), None => { let mut system = System::new(); diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 291e39996..eacc87315 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -193,7 +193,7 @@ impl StandardTableProvider { .into_iter() .map(|mut file| { let path = CONFIG - .parseable + .options .hot_tier_storage_path .as_ref() .unwrap() @@ -452,7 +452,7 @@ impl TableProvider for StandardTableProvider { } }; let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); - if CONFIG.parseable.mode == Mode::Query { + if CONFIG.options.mode == Mode::Query { let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); let obs = glob_storage .get_objects( diff --git a/src/rbac/map.rs b/src/rbac/map.rs index d9c31b8c9..652bf6bc1 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -110,8 +110,8 @@ pub fn init(metadata: &StorageMetadata) { sessions.track_new( admin_username, SessionKey::BasicAuth { - username: CONFIG.parseable.username.clone(), - password: CONFIG.parseable.password.clone(), + username: CONFIG.options.username.clone(), + password: CONFIG.options.password.clone(), }, chrono::DateTime::::MAX_UTC, admin_permissions, diff --git a/src/rbac/user.rs b/src/rbac/user.rs index c037a8381..ad0d302a3 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -137,8 +137,8 @@ pub struct PassCode { } pub fn get_admin_user() -> User { - let username = CONFIG.parseable.username.clone(); - let password = CONFIG.parseable.password.clone(); + let username = CONFIG.options.username.clone(); + let password = CONFIG.options.password.clone(); let hashcode = gen_hash(&password); User { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 7252cfaf0..c0084521c 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -158,7 +158,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result { let format = ObjectStoreFormat { created_at: Local::now().to_rfc3339(), - permissions: vec![Permisssion::new(CONFIG.parseable.username.clone())], + permissions: vec![Permisssion::new(CONFIG.options.username.clone())], stream_type: Some(stream_type.to_string()), time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), @@ -166,8 +166,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { static_schema_flag, schema_version: SchemaVersion::V1, // NOTE: Newly created streams are all V1 owner: Owner { - id: CONFIG.parseable.username.clone(), - group: CONFIG.parseable.username.clone(), + id: CONFIG.options.username.clone(), + group: CONFIG.options.username.clone(), }, ..Default::default() }; @@ -331,7 +331,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let mut config = serde_json::from_slice::(&bytes) .expect("parseable config is valid json"); - if CONFIG.parseable.mode == Mode::Ingest { + if CONFIG.options.mode == Mode::Ingest { config.stats = FullStats::default(); config.snapshot.manifest_list = vec![]; } @@ -665,7 +665,7 @@ pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { } pub fn schema_path(stream_name: &str) -> RelativePathBuf { - match CONFIG.parseable.mode { + match CONFIG.options.mode { Mode::Ingest => { let file_name = format!( ".ingestor.{}{}", @@ -683,7 +683,7 @@ pub fn schema_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { - match &CONFIG.parseable.mode { + match &CONFIG.options.mode { Mode::Ingest => { let file_name = format!( ".ingestor.{}{}", @@ -732,7 +732,7 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { - if CONFIG.parseable.mode == Mode::Ingest { + if CONFIG.options.mode == Mode::Ingest { let manifest_file_name = format!( "ingestor.{}.{}", INGESTOR_META.get_ingestor_id(), diff --git a/src/storage/staging.rs b/src/storage/staging.rs index 9646949c5..c05ef62c4 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -62,7 +62,7 @@ pub struct StorageDir { impl StorageDir { pub fn new(stream_name: &str) -> Self { - let data_path = CONFIG.parseable.local_stream_data_path(stream_name); + let data_path = CONFIG.options.local_stream_data_path(stream_name); Self { data_path } } @@ -80,7 +80,7 @@ impl StorageDir { } let local_uri = str::replace(&uri, "/", "."); let hostname = hostname_unchecked(); - if CONFIG.parseable.mode == Mode::Ingest { + if CONFIG.options.mode == Mode::Ingest { let id = INGESTOR_META.get_ingestor_id(); format!("{local_uri}{hostname}{id}.{extention}") } else { @@ -217,7 +217,7 @@ impl StorageDir { } // pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { -// let data_path = CONFIG.parseable.local_stream_data_path(stream_name); +// let data_path = CONFIG.options.local_stream_data_path(stream_name); // let dir = StorageDir::file_time_suffix(time, &HashMap::new(), PARQUET_FILE_EXTENSION); // // data_path.join(dir) @@ -336,8 +336,8 @@ pub fn parquet_writer_props( nulls_first: true, }); let mut props = WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_max_row_group_size(CONFIG.options.row_group_size) + .set_compression(CONFIG.options.parquet_compression.into()) .set_column_encoding( ColumnPath::new(vec![time_partition_field]), Encoding::DELTA_BINARY_PACKED, @@ -360,7 +360,7 @@ pub fn parquet_writer_props( } pub fn get_ingestor_info() -> anyhow::Result { - let path = PathBuf::from(&CONFIG.parseable.local_staging_path); + let path = PathBuf::from(&CONFIG.options.local_staging_path); // all the files should be in the staging directory root let entries = std::fs::read_dir(path)?; @@ -391,7 +391,7 @@ pub fn get_ingestor_info() -> anyhow::Result { if obj.get("flight_port").is_none() { obj.insert( "flight_port".to_owned(), - JsonValue::String(CONFIG.parseable.flight_port.to_string()), + JsonValue::String(CONFIG.options.flight_port.to_string()), ); } @@ -413,7 +413,7 @@ pub fn get_ingestor_info() -> anyhow::Result { let token = base64::prelude::BASE64_STANDARD.encode(format!( "{}:{}", - CONFIG.parseable.username, CONFIG.parseable.password + CONFIG.options.username, CONFIG.options.password )); let token = format!("Basic {}", token); @@ -438,10 +438,10 @@ pub fn get_ingestor_info() -> anyhow::Result { url, DEFAULT_VERSION.to_string(), store.get_bucket_name(), - &CONFIG.parseable.username, - &CONFIG.parseable.password, + &CONFIG.options.username, + &CONFIG.options.password, get_ingestor_id(), - CONFIG.parseable.flight_port.to_string(), + CONFIG.options.flight_port.to_string(), ); put_ingestor_info(out.clone())?; @@ -455,7 +455,7 @@ pub fn get_ingestor_info() -> anyhow::Result { /// /// * `ingestor_info`: The ingestor info to be stored. pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { - let path = PathBuf::from(&CONFIG.parseable.local_staging_path); + let path = PathBuf::from(&CONFIG.options.local_staging_path); let file_name = format!("ingestor.{}.json", info.ingestor_id); let file_path = path.join(file_name); diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 465f8f740..fef82e094 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -73,7 +73,7 @@ impl Default for StorageMetadata { staging: CONFIG.staging_dir().to_path_buf(), storage: CONFIG.storage().get_endpoint(), deployment_id: uid::gen(), - server_mode: CONFIG.parseable.mode, + server_mode: CONFIG.options.mode, users: Vec::new(), streams: Vec::new(), roles: HashMap::default(), @@ -120,7 +120,7 @@ pub async fn resolve_parseable_metadata( EnvChange::None(metadata) => { // overwrite staging anyways so that it matches remote in case of any divergence overwrite_staging = true; - if CONFIG.parseable.mode == Mode::All { + if CONFIG.options.mode == Mode::All { standalone_after_distributed(metadata.server_mode)?; } Ok(metadata) @@ -131,7 +131,7 @@ pub async fn resolve_parseable_metadata( EnvChange::NewStaging(mut metadata) => { // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query - if metadata.server_mode== Mode::All && CONFIG.parseable.mode == Mode::Ingest { + if metadata.server_mode== Mode::All && CONFIG.options.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { create_dir_all(CONFIG.staging_dir())?; @@ -140,7 +140,7 @@ pub async fn resolve_parseable_metadata( overwrite_staging = true; // overwrite remote in all and query mode // because staging dir has changed. - match CONFIG.parseable.mode { + match CONFIG.options.mode { Mode::All => { standalone_after_distributed(metadata.server_mode) .map_err(|err| { @@ -150,13 +150,13 @@ pub async fn resolve_parseable_metadata( }, Mode::Query => { overwrite_remote = true; - metadata.server_mode = CONFIG.parseable.mode; + metadata.server_mode = CONFIG.options.mode; metadata.staging = CONFIG.staging_dir().to_path_buf(); }, Mode::Ingest => { // if ingest server is started fetch the metadata from remote // update the server mode for local metadata - metadata.server_mode = CONFIG.parseable.mode; + metadata.server_mode = CONFIG.options.mode; metadata.staging = CONFIG.staging_dir().to_path_buf(); }, } @@ -168,7 +168,7 @@ pub async fn resolve_parseable_metadata( let metadata = StorageMetadata::default(); // new metadata needs to be set // if mode is query or all then both staging and remote - match CONFIG.parseable.mode { + match CONFIG.options.mode { Mode::All | Mode::Query => overwrite_remote = true, _ => (), } @@ -184,7 +184,7 @@ pub async fn resolve_parseable_metadata( ObjectStorageError::UnhandledError(err) })?; - metadata.server_mode = CONFIG.parseable.mode; + metadata.server_mode = CONFIG.options.mode; if overwrite_remote { put_remote_metadata(&metadata).await?; } @@ -205,7 +205,7 @@ fn determine_environment( // if both staging and remote have same deployment id but different server modes if staging.deployment_id == remote.deployment_id && remote.server_mode == Mode::All - && (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest) + && (CONFIG.options.mode == Mode::Query || CONFIG.options.mode == Mode::Ingest) { EnvChange::NewStaging(remote) } else if staging.deployment_id != remote.deployment_id { @@ -268,7 +268,7 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { let mut staging_metadata = meta.clone(); - staging_metadata.server_mode = CONFIG.parseable.mode; + staging_metadata.server_mode = CONFIG.options.mode; staging_metadata.staging = CONFIG.staging_dir().to_path_buf(); let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 6781f2d6e..8a78eb957 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -135,7 +135,7 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { ); let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)]; - CONFIG.parseable.mode == Mode::Query && include_now(&ex, &None) + CONFIG.options.mode == Mode::Query && include_now(&ex, &None) } fn lit_timestamp_milli(time: i64) -> Expr { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 0dc5f9765..e539b9e9f 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -238,18 +238,18 @@ impl TimePeriod { } pub fn get_url() -> Url { - if CONFIG.parseable.ingestor_endpoint.is_empty() { + if CONFIG.options.ingestor_endpoint.is_empty() { return format!( "{}://{}", - CONFIG.parseable.get_scheme(), - CONFIG.parseable.address + CONFIG.options.get_scheme(), + CONFIG.options.address ) .parse::() // if the value was improperly set, this will panic before hand .unwrap_or_else(|err| panic!("{}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", - err, CONFIG.parseable.address)); + err, CONFIG.options.address)); } - let ingestor_endpoint = &CONFIG.parseable.ingestor_endpoint; + let ingestor_endpoint = &CONFIG.options.ingestor_endpoint; if ingestor_endpoint.starts_with("http") { panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); @@ -276,7 +276,7 @@ pub fn get_url() -> Url { if hostname.starts_with("http") { panic!("Invalid value `{}`, please set the environement variable `{}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname); } else { - hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); + hostname = format!("{}://{}", CONFIG.options.get_scheme(), hostname); } } @@ -292,7 +292,7 @@ pub fn get_url() -> Url { } } - format!("{}://{}:{}", CONFIG.parseable.get_scheme(), hostname, port) + format!("{}://{}:{}", CONFIG.options.get_scheme(), hostname, port) .parse::() .expect("Valid URL") }