Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
target
data
staging
examples
Cargo.lock
cert.pem
Expand Down
10 changes: 9 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ base64 = "0.20.0"
bytes = "1"
chrono = "0.4.19"
chrono-humanize = "0.2.2"
clap = { version = "4.0.8", features = ["derive", "env"] }
clap = { version = "4.0.32", default-features = false, features = [
"std",
"color",
"help",
"derive",
"env",
"cargo",
"error-context",
] }
crossterm = "0.25"
datafusion = "13.0"
object_store = { version = "0.5.1", features = ["aws"] }
Expand Down
273 changes: 187 additions & 86 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use clap::{Parser, Subcommand};
use clap::{command, value_parser, Arg, Args, Command, FromArgMatches};
use crossterm::style::Stylize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand All @@ -33,9 +33,6 @@ lazy_static::lazy_static! {
pub static ref CONFIG: Arc<Config> = Arc::new(Config::new());
}

pub const USERNAME_ENV: &str = "P_USERNAME";
pub const PASSWORD_ENV: &str = "P_PASSWORD";

pub struct Config {
pub parseable: Server,
storage: Arc<dyn ObjectStorageProvider + Send + Sync>,
Expand All @@ -44,18 +41,42 @@ pub struct Config {

impl Config {
fn new() -> Self {
let cli = Cli::parse();
match cli.command {
SubCmd::ServerS3 { server, storage } => Config {
parseable: server,
storage: Arc::new(storage),
storage_name: "s3",
},
SubCmd::ServerDrive { server, storage } => Config {
parseable: server,
storage: Arc::new(storage),
storage_name: "drive",
},
let cli = parseable_cli_command().get_matches();

match cli.subcommand() {
Some(("--local-store", m)) => {
let server = match Server::from_arg_matches(m) {
Ok(server) => server,
Err(err) => err.exit(),
};
let storage = match FSConfig::from_arg_matches(m) {
Ok(server) => server,
Err(err) => err.exit(),
};

Config {
parseable: server,
storage: Arc::new(storage),
storage_name: "drive",
}
}
Some(("--s3-store", m)) => {
let server = match Server::from_arg_matches(m) {
Ok(server) => server,
Err(err) => err.exit(),
};
let storage = match S3Config::from_arg_matches(m) {
Ok(server) => server,
Err(err) => err.exit(),
};

Config {
parseable: server,
storage: Arc::new(storage),
storage_name: "s3",
}
}
_ => unreachable!(),
}
}

Expand Down Expand Up @@ -133,104 +154,112 @@ impl Default for Config {
}
}

#[derive(Parser)] // requires `derive` feature
#[command(
name = "Parseable",
bin_name = "parseable",
about = "Parseable is a log storage and observability platform.",
version
)]
struct Cli {
#[command(subcommand)]
command: SubCmd,
}
fn parseable_cli_command() -> Command {
let local = Server::get_clap_command("--local-store");
let local = <FSConfig as Args>::augment_args_for_update(local);

let local = local
.mut_arg(Server::USERNAME, |arg| {
arg.required(false).default_value("admin")
})
.mut_arg(Server::PASSWORD, |arg| {
arg.required(false).default_value("admin")
});

let s3 = Server::get_clap_command("--s3-store");
let s3 = <S3Config as Args>::augment_args_for_update(s3);

#[derive(Subcommand, Clone)]
enum SubCmd {
#[command(name = "--s3-store")]
ServerS3 {
#[command(flatten)]
server: Server,
#[command(flatten)]
storage: S3Config,
},
#[command(name = "--local-store")]
ServerDrive {
#[command(flatten)]
server: Server,
#[command(flatten)]
storage: FSConfig,
},
command!()
.name("Parseable")
.bin_name("parseable")
.about("Parseable is a log storage and observability platform.")
.propagate_version(true)
.next_line_help(false)
.help_template(
r#"
{name} - v{version}
{about-with-newline}
{all-args}
{after-help}
{author}
"#,
)
.after_help("Checkout https://parseable.io for documentation")
.subcommand_required(true)
.subcommands([local, s3])
}

#[derive(clap::Args, Debug, Clone)]
#[clap(name = "server", about = "Start the Parseable server")]
#[derive(Debug, Default)]
pub struct Server {
/// The location of TLS Cert file
#[arg(
long,
env = "P_TLS_CERT_PATH",
value_name = "path",
value_parser = validation::file_path
)]
pub tls_cert_path: Option<PathBuf>,

/// The location of TLS Private Key file
#[arg(
long,
env = "P_TLS_KEY_PATH",
value_name = "path",
value_parser = validation::file_path
)]
pub tls_key_path: Option<PathBuf>,

/// The address on which the http server will listen.
#[arg(
long,
env = "P_ADDR",
default_value = "0.0.0.0:8000",
value_name = "url"
)]
pub address: String,

/// The local staging path is used as a temporary landing point
/// for incoming events and local cache
#[arg(
long,
env = "P_STAGING_DIR",
default_value = "./data",
value_name = "path"
)]
pub local_staging_path: PathBuf,

/// Interval in seconds after which uncommited data would be
/// uploaded to the storage platform.
#[arg(
long,
env = "P_STORAGE_UPLOAD_INTERVAL",
default_value = "60",
value_name = "seconds"
)]
pub upload_interval: u64,

/// Username for the basic authentication on the server
#[arg(
long,
env = USERNAME_ENV,
value_name = "username",
)]
pub username: String,

/// Password for the basic authentication on the server
#[arg(
long,
env = PASSWORD_ENV,
value_name = "password",
)]
pub password: String,
}

impl FromArgMatches for Server {
fn from_arg_matches(m: &clap::ArgMatches) -> Result<Self, clap::Error> {
let mut s: Self = Self::default();
s.update_from_arg_matches(m)?;
Ok(s)
}

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.address = m
.get_one::<String>(Self::ADDRESS)
.cloned()
.expect("default value for address");
self.local_staging_path = m
.get_one::<PathBuf>(Self::STAGING)
.cloned()
.expect("default value for staging");
self.upload_interval = m
.get_one::<u64>(Self::UPLOAD_INTERVAL)
.cloned()
.expect("default value for upload");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
.expect("default for username");
self.password = m
.get_one::<String>(Self::PASSWORD)
.cloned()
.expect("default for password");

Ok(())
}
}

impl Server {
// identifiers for arguments
pub const TLS_CERT: &str = "tls-cert-path";
pub const TLS_KEY: &str = "tls-key-path";
pub const ADDRESS: &str = "address";
pub const STAGING: &str = "local-staging-path";
pub const UPLOAD_INTERVAL: &str = "upload-interval";
pub const USERNAME: &str = "username";
pub const PASSWORD: &str = "password";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
}
Expand All @@ -242,10 +271,75 @@ impl Server {

"http".to_string()
}

pub fn get_clap_command(name: &'static str) -> Command {
Command::new(name).next_line_help(false)
.arg(
Arg::new(Self::TLS_CERT)
.long(Self::TLS_CERT)
.env("P_TLS_CERT_PATH")
.value_name("PATH")
.value_parser(validation::file_path)
.help("The location of TLS Cert file"),
)
.arg(
Arg::new(Self::TLS_KEY)
.long(Self::TLS_KEY)
.env("P_TLS_KEY_PATH")
.value_name("PATH")
.value_parser(validation::file_path)
.help("The location of TLS Private Key file"),
)
.arg(
Arg::new(Self::ADDRESS)
.long(Self::ADDRESS)
.env("P_ADDR")
.value_name("ADDR:PORT")
.default_value("0.0.0.0:8000")
.value_parser(validation::socket_addr)
.help("The address on which the http server will listen."),
)
.arg(
Arg::new(Self::STAGING)
.long(Self::STAGING)
.env("P_STAGING_DIR")
.value_name("DIR")
.default_value("./staging")
.value_parser(value_parser!(PathBuf))
.help("The local staging path is used as a temporary landing point for incoming events and local cache")
.next_line_help(true),
)
.arg(
Arg::new(Self::UPLOAD_INTERVAL)
.long(Self::UPLOAD_INTERVAL)
.env("P_STORAGE_UPLOAD_INTERVAL")
.value_name("SECONDS")
.default_value("60")
.value_parser(value_parser!(u64))
.help("Interval in seconds after which uncommited data would be uploaded to the storage platform.")
.next_line_help(true),
)
.arg(
Arg::new(Self::USERNAME)
.long(Self::USERNAME)
.env("P_USERNAME")
.value_name("STRING")
.required(true)
.help("Username for the basic authentication on the server"),
)
.arg(
Arg::new(Self::PASSWORD)
.long(Self::PASSWORD)
.env("P_PASSWORD")
.value_name("STRING")
.required(true)
.help("Password for the basic authentication on the server"),
)
}
}

pub mod validation {
use std::path::PathBuf;
use std::{net::ToSocketAddrs, path::PathBuf};

pub fn file_path(s: &str) -> Result<PathBuf, String> {
if s.is_empty() {
Expand All @@ -260,4 +354,11 @@ pub mod validation {

Ok(path)
}

pub fn socket_addr(s: &str) -> Result<String, String> {
s.to_socket_addrs()
.is_ok()
.then_some(s.to_string())
.ok_or_else(|| "Socket Address for server is invalid".to_string())
}
}
12 changes: 10 additions & 2 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}
#[derive(Debug, Clone, clap::Args)]
#[command(
name = "Local filesystem config",
about = "Start Parseable with local filesystem as storage backend (non production use only)"
about = "Start Parseable with local filesystem as storage backend (non production use only)",
help_template = "\
{about-section}
{all-args}
"
)]
pub struct FSConfig {
#[arg(env = "P_FS_PATH", value_name = "filesystem path")]
#[arg(
env = "P_FS_PATH",
value_name = "filesystem path",
default_value = "./data"
)]
root: PathBuf,
}

Expand Down
Loading