diff --git a/server/src/option.rs b/server/src/option.rs index cfa74e55e..56a26502b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -16,6 +16,7 @@ * */ +use clap::builder::ArgPredicate; use clap::Parser; use crossterm::style::Stylize; use std::path::PathBuf; @@ -27,42 +28,45 @@ use crate::storage::{ObjectStorage, ObjectStorageError, LOCAL_SYNC_INTERVAL}; lazy_static::lazy_static! { #[derive(Debug)] - pub static ref CONFIG: Arc = { - let storage = Box::new(S3Config::parse()); - Arc::new(Config::new(storage)) - }; + pub static ref CONFIG: Arc> = Arc::new(Config::new()); } pub const USERNAME_ENV: &str = "P_USERNAME"; -pub const PASSOWRD_ENV: &str = "P_PASSWORD"; +pub const PASSWORD_ENV: &str = "P_PASSWORD"; pub const DEFAULT_USERNAME: &str = "parseable"; pub const DEFAULT_PASSWORD: &str = "parseable"; pub trait StorageOpt: Sync + Send { fn bucket_name(&self) -> &str; fn endpoint_url(&self) -> &str; - fn warning(&self); - fn is_default_url(&self) -> bool; } -pub struct Config { - pub parseable: Opt, - pub storage: Box, +pub struct Config +where + S: Clone + clap::Args + StorageOpt, +{ + pub parseable: Opt, } -impl Config { - fn new(storage: Box) -> Config { +impl Config +where + S: Clone + clap::Args + StorageOpt, +{ + fn new() -> Self { Config { - parseable: Opt::parse(), - storage, + parseable: Opt::::parse(), } } + pub fn storage(&self) -> &S { + &self.parseable.objectstore_config + } + pub fn print(&self) { let scheme = CONFIG.parseable.get_scheme(); self.status_info(&scheme); banner::version::print(); - self.warning(); + self.demo(); self.storage_info(); banner::system_info(); println!(); @@ -80,11 +84,11 @@ impl Config { Err(ObjectStorageError::NoSuchBucket(name)) => panic!( "Could not start because the bucket doesn't exist. Please ensure bucket {bucket} exists on {url}", bucket = name, - url = self.storage.endpoint_url() + url = self.storage().endpoint_url() ), Err(ObjectStorageError::ConnectionError(inner)) => panic!( "Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}", - url = self.storage.endpoint_url(), + url = self.storage().endpoint_url(), cause = inner ), Err(ObjectStorageError::AuthenticationError(inner)) => panic!( @@ -96,15 +100,15 @@ impl Config { } fn status_info(&self, scheme: &str) { - let url = format!("{}://{}", scheme, CONFIG.parseable.address).underlined(); + let url = format!("{}://{}", scheme, self.parseable.address).underlined(); eprintln!( " {} {} {}", format!("Parseable server started at: {}", url).bold(), - format!("Username: {}", CONFIG.parseable.username).bold(), - format!("Password: {}", CONFIG.parseable.password).bold(), + format!("Username: {}", self.parseable.username).bold(), + format!("Password: {}", self.parseable.password).bold(), ) } @@ -116,60 +120,39 @@ impl Config { Object Storage: {}/{}", "Storage:".to_string().blue().bold(), self.parseable.local_disk_path.to_string_lossy(), - self.storage.endpoint_url(), - self.storage.bucket_name() + self.storage().endpoint_url(), + self.storage().bucket_name() ) } - fn warning(&self) { - match (self.storage.is_default_url(), self.is_default_cred()) { - (true, true) => { - banner::warning_line(); - self.cred_warning(); - self.storage.warning(); - } - (true, _) => { - banner::warning_line(); - self.storage.warning(); - } - (_, true) => { - banner::warning_line(); - self.cred_warning(); - } - _ => {} - } - } - - fn is_default_cred(&self) -> bool { - CONFIG.parseable.username == DEFAULT_USERNAME - && CONFIG.parseable.password == DEFAULT_PASSWORD - } - - fn cred_warning(&self) { - if self.is_default_cred() { + fn demo(&self) { + if self.is_demo() { + banner::warning_line(); eprintln!( " - {} {}", - "Parseable server is using default credentials." + "Parseable is in demo mode with default credentials and open object store. Please use this for demo purposes only." .to_string() .red(), - format!( - "Setup your credentials with {} and {} before storing production logs.", - USERNAME_ENV, PASSOWRD_ENV ) - .red() - ) } } + + fn is_demo(&self) -> bool { + self.parseable.demo + } } #[derive(Debug, Clone, Parser)] #[command( - name = "Parseable config", - about = "configuration for Parseable server" + name = "Parseable", + about = "Configuration for Parseable server", + version )] -pub struct Opt { +pub struct Opt +where + S: Clone + clap::Args + StorageOpt, +{ /// The location of TLS Cert file #[arg(long, env = "P_TLS_CERT_PATH")] pub tls_cert_path: Option, @@ -194,15 +177,33 @@ pub struct Opt { pub upload_interval: u64, /// Optional username to enable basic auth on the server - #[arg(long, env = USERNAME_ENV, default_value = DEFAULT_USERNAME)] + #[arg( + long, + env = USERNAME_ENV, + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_USERNAME) + )] pub username: String, /// Optional password to enable basic auth on the server - #[arg(long, env = PASSOWRD_ENV, default_value = DEFAULT_PASSWORD)] + #[arg( + long, + env = PASSWORD_ENV, + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_PASSWORD) + )] pub password: String, + + #[clap(flatten)] + pub objectstore_config: S, + + /// Run Parseable in demo mode with default credentials and open object store + #[arg(short, long, exclusive = true)] + pub demo: bool, } -impl Opt { +impl Opt +where + S: Clone + clap::Args + StorageOpt, +{ pub fn get_cache_path(&self, stream_name: &str) -> PathBuf { self.local_disk_path.join(stream_name) } diff --git a/server/src/s3.rs b/server/src/s3.rs index 1a4c35c9c..c57af4963 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -7,8 +7,7 @@ use aws_sdk_s3::RetryConfig; use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; use aws_smithy_async::rt::sleep::default_async_sleep; use bytes::Bytes; -use clap::Parser; -use crossterm::style::Stylize; +use clap::builder::ArgPredicate; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -42,8 +41,6 @@ const DEFAULT_S3_BUCKET: &str = "parseable"; const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; -const S3_URL_ENV_VAR: &str = "P_S3_URL"; - // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; @@ -63,13 +60,13 @@ impl ObjectStoreFormat { lazy_static::lazy_static! { #[derive(Debug)] - pub static ref S3_CONFIG: Arc = Arc::new(S3Config::parse()); + pub static ref S3_CONFIG: Arc = Arc::new(CONFIG.storage().clone()); // runtime to be used in query session pub static ref STORAGE_RUNTIME: Arc = { let s3 = AmazonS3Builder::new() - .with_region(&S3_CONFIG.s3_default_region) + .with_region(&S3_CONFIG.s3_region) .with_endpoint(&S3_CONFIG.s3_endpoint_url) .with_bucket_name(&S3_CONFIG.s3_bucket_name) .with_access_key_id(&S3_CONFIG.s3_access_key_id) @@ -94,27 +91,47 @@ lazy_static::lazy_static! { }; } -#[derive(Debug, Clone, Parser)] +#[derive(Debug, Clone, clap::Args)] #[command(name = "S3 config", about = "configuration for AWS S3 SDK")] pub struct S3Config { /// The endpoint to AWS S3 or compatible object storage platform - #[arg(long, env = S3_URL_ENV_VAR, default_value = DEFAULT_S3_URL )] + #[arg( + long, + env = "P_S3_URL", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_URL) + )] pub s3_endpoint_url: String, /// The access key for AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_ACCESS_KEY", default_value = DEFAULT_S3_ACCESS_KEY)] + #[arg( + long, + env = "P_S3_ACCESS_KEY", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_ACCESS_KEY) + )] pub s3_access_key_id: String, /// The secret key for AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_SECRET_KEY", default_value = DEFAULT_S3_SECRET_KEY)] + #[arg( + long, + env = "P_S3_SECRET_KEY", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_SECRET_KEY) + )] pub s3_secret_key: String, /// The region for AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_REGION", default_value = DEFAULT_S3_REGION)] - pub s3_default_region: String, + #[arg( + long, + env = "P_S3_REGION", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_REGION) + )] + pub s3_region: String, /// The AWS S3 or compatible object storage bucket to be used for storage - #[arg(long, env = "P_S3_BUCKET", default_value = DEFAULT_S3_BUCKET)] + #[arg( + long, + env = "P_S3_BUCKET", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_BUCKET) + )] pub s3_bucket_name: String, } @@ -126,28 +143,6 @@ impl StorageOpt for S3Config { fn endpoint_url(&self) -> &str { &self.s3_endpoint_url } - - fn is_default_url(&self) -> bool { - self.s3_endpoint_url == DEFAULT_S3_URL - } - - fn warning(&self) { - if self.is_default_url() { - eprintln!( - " - {} - {}", - "Parseable server is using default object storage backend with public access." - .to_string() - .red(), - format!( - "Setup your object storage backend with {} before storing production logs.", - S3_URL_ENV_VAR - ) - .red() - ) - } - } } struct S3Options { @@ -160,7 +155,7 @@ impl S3Options { fn new() -> Self { let uri = S3_CONFIG.s3_endpoint_url.parse::().unwrap(); let endpoint = Endpoint::immutable(uri); - let region = Region::new(&S3_CONFIG.s3_default_region); + let region = Region::new(&S3_CONFIG.s3_region); let creds = Credentials::new( &S3_CONFIG.s3_access_key_id, &S3_CONFIG.s3_secret_key,