diff --git a/.gitignore b/.gitignore index 6bc2d7653..49467bf82 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ target data +staging examples Cargo.lock cert.pem diff --git a/server/Cargo.toml b/server/Cargo.toml index 020278b6c..522bc25fa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,7 +19,15 @@ base64 = "0.20.0" bytes = "1" chrono = "0.4.19" chrono-humanize = "0.2.2" -clap = { version = "4.0.8", features = ["derive", "env"] } +clap = { version = "4.0.32", default-features = false, features = [ + "std", + "color", + "help", + "derive", + "env", + "cargo", + "error-context", +] } crossterm = "0.25" datafusion = "13.0" object_store = { version = "0.5.1", features = ["aws"] } diff --git a/server/src/option.rs b/server/src/option.rs index 4ba2bc354..291f9dfd4 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -16,7 +16,7 @@ * */ -use clap::{Parser, Subcommand}; +use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; use crossterm::style::Stylize; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -33,9 +33,6 @@ lazy_static::lazy_static! { pub static ref CONFIG: Arc = Arc::new(Config::new()); } -pub const USERNAME_ENV: &str = "P_USERNAME"; -pub const PASSWORD_ENV: &str = "P_PASSWORD"; - pub struct Config { pub parseable: Server, storage: Arc, @@ -44,18 +41,42 @@ pub struct Config { impl Config { fn new() -> Self { - let cli = Cli::parse(); - match cli.command { - SubCmd::ServerS3 { server, storage } => Config { - parseable: server, - storage: Arc::new(storage), - storage_name: "s3", - }, - SubCmd::ServerDrive { server, storage } => Config { - parseable: server, - storage: Arc::new(storage), - storage_name: "drive", - }, + let cli = parseable_cli_command().get_matches(); + + match cli.subcommand() { + Some(("--local-store", m)) => { + let server = match Server::from_arg_matches(m) { + Ok(server) => server, + Err(err) => err.exit(), + }; + let storage = match FSConfig::from_arg_matches(m) { + Ok(server) => server, + Err(err) => err.exit(), + }; + + Config { + parseable: server, + storage: Arc::new(storage), + storage_name: "drive", + } + } + Some(("--s3-store", m)) => { + let server = match Server::from_arg_matches(m) { + Ok(server) => server, + Err(err) => err.exit(), + }; + let storage = match S3Config::from_arg_matches(m) { + Ok(server) => server, + Err(err) => err.exit(), + }; + + Config { + parseable: server, + storage: Arc::new(storage), + storage_name: "s3", + } + } + _ => unreachable!(), } } @@ -133,104 +154,112 @@ impl Default for Config { } } -#[derive(Parser)] // requires `derive` feature -#[command( - name = "Parseable", - bin_name = "parseable", - about = "Parseable is a log storage and observability platform.", - version -)] -struct Cli { - #[command(subcommand)] - command: SubCmd, -} +fn parseable_cli_command() -> Command { + let local = Server::get_clap_command("--local-store"); + let local = ::augment_args_for_update(local); + + let local = local + .mut_arg(Server::USERNAME, |arg| { + arg.required(false).default_value("admin") + }) + .mut_arg(Server::PASSWORD, |arg| { + arg.required(false).default_value("admin") + }); + + let s3 = Server::get_clap_command("--s3-store"); + let s3 = ::augment_args_for_update(s3); -#[derive(Subcommand, Clone)] -enum SubCmd { - #[command(name = "--s3-store")] - ServerS3 { - #[command(flatten)] - server: Server, - #[command(flatten)] - storage: S3Config, - }, - #[command(name = "--local-store")] - ServerDrive { - #[command(flatten)] - server: Server, - #[command(flatten)] - storage: FSConfig, - }, + command!() + .name("Parseable") + .bin_name("parseable") + .about("Parseable is a log storage and observability platform.") + .propagate_version(true) + .next_line_help(false) + .help_template( + r#" +{name} - v{version} +{about-with-newline} +{all-args} +{after-help} +{author} + "#, + ) + .after_help("Checkout https://parseable.io for documentation") + .subcommand_required(true) + .subcommands([local, s3]) } -#[derive(clap::Args, Debug, Clone)] -#[clap(name = "server", about = "Start the Parseable server")] +#[derive(Debug, Default)] pub struct Server { /// The location of TLS Cert file - #[arg( - long, - env = "P_TLS_CERT_PATH", - value_name = "path", - value_parser = validation::file_path - )] pub tls_cert_path: Option, /// The location of TLS Private Key file - #[arg( - long, - env = "P_TLS_KEY_PATH", - value_name = "path", - value_parser = validation::file_path - )] pub tls_key_path: Option, /// The address on which the http server will listen. - #[arg( - long, - env = "P_ADDR", - default_value = "0.0.0.0:8000", - value_name = "url" - )] pub address: String, /// The local staging path is used as a temporary landing point /// for incoming events and local cache - #[arg( - long, - env = "P_STAGING_DIR", - default_value = "./data", - value_name = "path" - )] pub local_staging_path: PathBuf, /// Interval in seconds after which uncommited data would be /// uploaded to the storage platform. - #[arg( - long, - env = "P_STORAGE_UPLOAD_INTERVAL", - default_value = "60", - value_name = "seconds" - )] pub upload_interval: u64, /// Username for the basic authentication on the server - #[arg( - long, - env = USERNAME_ENV, - value_name = "username", - )] pub username: String, /// Password for the basic authentication on the server - #[arg( - long, - env = PASSWORD_ENV, - value_name = "password", - )] pub password: String, } +impl FromArgMatches for Server { + fn from_arg_matches(m: &clap::ArgMatches) -> Result { + let mut s: Self = Self::default(); + s.update_from_arg_matches(m)?; + Ok(s) + } + + fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { + self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); + self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); + self.address = m + .get_one::(Self::ADDRESS) + .cloned() + .expect("default value for address"); + self.local_staging_path = m + .get_one::(Self::STAGING) + .cloned() + .expect("default value for staging"); + self.upload_interval = m + .get_one::(Self::UPLOAD_INTERVAL) + .cloned() + .expect("default value for upload"); + self.username = m + .get_one::(Self::USERNAME) + .cloned() + .expect("default for username"); + self.password = m + .get_one::(Self::PASSWORD) + .cloned() + .expect("default for password"); + + Ok(()) + } +} + impl Server { + // identifiers for arguments + pub const TLS_CERT: &str = "tls-cert-path"; + pub const TLS_KEY: &str = "tls-key-path"; + pub const ADDRESS: &str = "address"; + pub const STAGING: &str = "local-staging-path"; + pub const UPLOAD_INTERVAL: &str = "upload-interval"; + pub const USERNAME: &str = "username"; + pub const PASSWORD: &str = "password"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -242,10 +271,75 @@ impl Server { "http".to_string() } + + pub fn get_clap_command(name: &'static str) -> Command { + Command::new(name).next_line_help(false) + .arg( + Arg::new(Self::TLS_CERT) + .long(Self::TLS_CERT) + .env("P_TLS_CERT_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("The location of TLS Cert file"), + ) + .arg( + Arg::new(Self::TLS_KEY) + .long(Self::TLS_KEY) + .env("P_TLS_KEY_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("The location of TLS Private Key file"), + ) + .arg( + Arg::new(Self::ADDRESS) + .long(Self::ADDRESS) + .env("P_ADDR") + .value_name("ADDR:PORT") + .default_value("0.0.0.0:8000") + .value_parser(validation::socket_addr) + .help("The address on which the http server will listen."), + ) + .arg( + Arg::new(Self::STAGING) + .long(Self::STAGING) + .env("P_STAGING_DIR") + .value_name("DIR") + .default_value("./staging") + .value_parser(value_parser!(PathBuf)) + .help("The local staging path is used as a temporary landing point for incoming events and local cache") + .next_line_help(true), + ) + .arg( + Arg::new(Self::UPLOAD_INTERVAL) + .long(Self::UPLOAD_INTERVAL) + .env("P_STORAGE_UPLOAD_INTERVAL") + .value_name("SECONDS") + .default_value("60") + .value_parser(value_parser!(u64)) + .help("Interval in seconds after which uncommited data would be uploaded to the storage platform.") + .next_line_help(true), + ) + .arg( + Arg::new(Self::USERNAME) + .long(Self::USERNAME) + .env("P_USERNAME") + .value_name("STRING") + .required(true) + .help("Username for the basic authentication on the server"), + ) + .arg( + Arg::new(Self::PASSWORD) + .long(Self::PASSWORD) + .env("P_PASSWORD") + .value_name("STRING") + .required(true) + .help("Password for the basic authentication on the server"), + ) + } } pub mod validation { - use std::path::PathBuf; + use std::{net::ToSocketAddrs, path::PathBuf}; pub fn file_path(s: &str) -> Result { if s.is_empty() { @@ -260,4 +354,11 @@ pub mod validation { Ok(path) } + + pub fn socket_addr(s: &str) -> Result { + s.to_socket_addrs() + .is_ok() + .then_some(s.to_string()) + .ok_or_else(|| "Socket Address for server is invalid".to_string()) + } } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 6a1d12429..f0751bcea 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -43,10 +43,18 @@ use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider} #[derive(Debug, Clone, clap::Args)] #[command( name = "Local filesystem config", - about = "Start Parseable with local filesystem as storage backend (non production use only)" + about = "Start Parseable with local filesystem as storage backend (non production use only)", + help_template = "\ +{about-section} +{all-args} +" )] pub struct FSConfig { - #[arg(env = "P_FS_PATH", value_name = "filesystem path")] + #[arg( + env = "P_FS_PATH", + value_name = "filesystem path", + default_value = "./data" + )] root: PathBuf, } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 5408250d1..1907571e6 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -53,27 +53,41 @@ use super::ObjectStorageProvider; #[derive(Debug, Clone, clap::Args)] #[command( name = "S3 config", - about = "Start Parseable with AWS S3 or compatible as storage backend" + about = "Start Parseable with AWS S3 or compatible as storage backend", + help_template = "\ +{about-section} +{all-args} +" )] pub struct S3Config { /// The endpoint to AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_URL", value_name = "url")] + #[arg(long, env = "P_S3_URL", value_name = "url", required = true)] pub endpoint_url: String, /// The access key for AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_ACCESS_KEY", value_name = "access-key")] + #[arg( + long, + env = "P_S3_ACCESS_KEY", + value_name = "access-key", + required = true + )] pub access_key_id: String, /// The secret key for AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_SECRET_KEY", value_name = "secret-key")] + #[arg( + long, + env = "P_S3_SECRET_KEY", + value_name = "secret-key", + required = true + )] pub secret_key: String, /// The region for AWS S3 or compatible object storage platform - #[arg(long, env = "P_S3_REGION", value_name = "region")] + #[arg(long, env = "P_S3_REGION", value_name = "region", required = true)] pub region: String, /// The AWS S3 or compatible object storage bucket to be used for storage - #[arg(long, env = "P_S3_BUCKET", value_name = "bucket-name")] + #[arg(long, env = "P_S3_BUCKET", value_name = "bucket-name", required = true)] pub bucket_name: String, /// Set client to send content_md5 header on every put request