Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 62 additions & 61 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use clap::builder::ArgPredicate;
use clap::Parser;
use crossterm::style::Stylize;
use std::path::PathBuf;
Expand All @@ -27,42 +28,45 @@ use crate::storage::{ObjectStorage, ObjectStorageError, LOCAL_SYNC_INTERVAL};

lazy_static::lazy_static! {
#[derive(Debug)]
pub static ref CONFIG: Arc<Config> = {
let storage = Box::new(S3Config::parse());
Arc::new(Config::new(storage))
};
pub static ref CONFIG: Arc<Config<S3Config>> = Arc::new(Config::new());
}

pub const USERNAME_ENV: &str = "P_USERNAME";
pub const PASSOWRD_ENV: &str = "P_PASSWORD";
pub const PASSWORD_ENV: &str = "P_PASSWORD";
pub const DEFAULT_USERNAME: &str = "parseable";
pub const DEFAULT_PASSWORD: &str = "parseable";

pub trait StorageOpt: Sync + Send {
fn bucket_name(&self) -> &str;
fn endpoint_url(&self) -> &str;
fn warning(&self);
fn is_default_url(&self) -> bool;
}

pub struct Config {
pub parseable: Opt,
pub storage: Box<dyn StorageOpt>,
pub struct Config<S>
where
S: Clone + clap::Args + StorageOpt,
{
pub parseable: Opt<S>,
}

impl Config {
fn new(storage: Box<dyn StorageOpt>) -> Config {
impl<S> Config<S>
where
S: Clone + clap::Args + StorageOpt,
{
fn new() -> Self {
Config {
parseable: Opt::parse(),
storage,
parseable: Opt::<S>::parse(),
}
}

pub fn storage(&self) -> &S {
&self.parseable.objectstore_config
}

pub fn print(&self) {
let scheme = CONFIG.parseable.get_scheme();
self.status_info(&scheme);
banner::version::print();
self.warning();
self.demo();
self.storage_info();
banner::system_info();
println!();
Expand All @@ -80,11 +84,11 @@ impl Config {
Err(ObjectStorageError::NoSuchBucket(name)) => panic!(
"Could not start because the bucket doesn't exist. Please ensure bucket {bucket} exists on {url}",
bucket = name,
url = self.storage.endpoint_url()
url = self.storage().endpoint_url()
),
Err(ObjectStorageError::ConnectionError(inner)) => panic!(
"Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}",
url = self.storage.endpoint_url(),
url = self.storage().endpoint_url(),
cause = inner
),
Err(ObjectStorageError::AuthenticationError(inner)) => panic!(
Expand All @@ -96,15 +100,15 @@ impl Config {
}

fn status_info(&self, scheme: &str) {
let url = format!("{}://{}", scheme, CONFIG.parseable.address).underlined();
let url = format!("{}://{}", scheme, self.parseable.address).underlined();
eprintln!(
"
{}
{}
{}",
format!("Parseable server started at: {}", url).bold(),
format!("Username: {}", CONFIG.parseable.username).bold(),
format!("Password: {}", CONFIG.parseable.password).bold(),
format!("Username: {}", self.parseable.username).bold(),
format!("Password: {}", self.parseable.password).bold(),
)
}

Expand All @@ -116,60 +120,39 @@ impl Config {
Object Storage: {}/{}",
"Storage:".to_string().blue().bold(),
self.parseable.local_disk_path.to_string_lossy(),
self.storage.endpoint_url(),
self.storage.bucket_name()
self.storage().endpoint_url(),
self.storage().bucket_name()
)
}

fn warning(&self) {
match (self.storage.is_default_url(), self.is_default_cred()) {
(true, true) => {
banner::warning_line();
self.cred_warning();
self.storage.warning();
}
(true, _) => {
banner::warning_line();
self.storage.warning();
}
(_, true) => {
banner::warning_line();
self.cred_warning();
}
_ => {}
}
}

fn is_default_cred(&self) -> bool {
CONFIG.parseable.username == DEFAULT_USERNAME
&& CONFIG.parseable.password == DEFAULT_PASSWORD
}

fn cred_warning(&self) {
if self.is_default_cred() {
fn demo(&self) {
if self.is_demo() {
banner::warning_line();
eprintln!(
"
{}
{}",
"Parseable server is using default credentials."
"Parseable is in demo mode with default credentials and open object store. Please use this for demo purposes only."
.to_string()
.red(),
format!(
"Setup your credentials with {} and {} before storing production logs.",
USERNAME_ENV, PASSOWRD_ENV
)
.red()
)
}
}

fn is_demo(&self) -> bool {
self.parseable.demo
}
}

#[derive(Debug, Clone, Parser)]
#[command(
name = "Parseable config",
about = "configuration for Parseable server"
name = "Parseable",
about = "Configuration for Parseable server",
version
)]
pub struct Opt {
pub struct Opt<S>
where
S: Clone + clap::Args + StorageOpt,
{
/// The location of TLS Cert file
#[arg(long, env = "P_TLS_CERT_PATH")]
pub tls_cert_path: Option<PathBuf>,
Expand All @@ -194,15 +177,33 @@ pub struct Opt {
pub upload_interval: u64,

/// Optional username to enable basic auth on the server
#[arg(long, env = USERNAME_ENV, default_value = DEFAULT_USERNAME)]
#[arg(
long,
env = USERNAME_ENV,
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_USERNAME)
)]
pub username: String,

/// Optional password to enable basic auth on the server
#[arg(long, env = PASSOWRD_ENV, default_value = DEFAULT_PASSWORD)]
#[arg(
long,
env = PASSWORD_ENV,
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_PASSWORD)
)]
pub password: String,

#[clap(flatten)]
pub objectstore_config: S,

/// Run Parseable in demo mode with default credentials and open object store
#[arg(short, long, exclusive = true)]
pub demo: bool,
}

impl Opt {
impl<S> Opt<S>
where
S: Clone + clap::Args + StorageOpt,
{
pub fn get_cache_path(&self, stream_name: &str) -> PathBuf {
self.local_disk_path.join(stream_name)
}
Expand Down
67 changes: 31 additions & 36 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use aws_sdk_s3::RetryConfig;
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
use aws_smithy_async::rt::sleep::default_async_sleep;
use bytes::Bytes;
use clap::Parser;
use crossterm::style::Stylize;
use clap::builder::ArgPredicate;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -42,8 +41,6 @@ const DEFAULT_S3_BUCKET: &str = "parseable";
const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin";
const DEFAULT_S3_SECRET_KEY: &str = "minioadmin";

const S3_URL_ENV_VAR: &str = "P_S3_URL";

// max concurrent request allowed for datafusion object store
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;

Expand All @@ -63,13 +60,13 @@ impl ObjectStoreFormat {

lazy_static::lazy_static! {
#[derive(Debug)]
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(S3Config::parse());
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(CONFIG.storage().clone());

// runtime to be used in query session
pub static ref STORAGE_RUNTIME: Arc<RuntimeEnv> = {

let s3 = AmazonS3Builder::new()
.with_region(&S3_CONFIG.s3_default_region)
.with_region(&S3_CONFIG.s3_region)
.with_endpoint(&S3_CONFIG.s3_endpoint_url)
.with_bucket_name(&S3_CONFIG.s3_bucket_name)
.with_access_key_id(&S3_CONFIG.s3_access_key_id)
Expand All @@ -94,27 +91,47 @@ lazy_static::lazy_static! {
};
}

#[derive(Debug, Clone, Parser)]
#[derive(Debug, Clone, clap::Args)]
#[command(name = "S3 config", about = "configuration for AWS S3 SDK")]
pub struct S3Config {
/// The endpoint to AWS S3 or compatible object storage platform
#[arg(long, env = S3_URL_ENV_VAR, default_value = DEFAULT_S3_URL )]
#[arg(
long,
env = "P_S3_URL",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_URL)
)]
pub s3_endpoint_url: String,

/// The access key for AWS S3 or compatible object storage platform
#[arg(long, env = "P_S3_ACCESS_KEY", default_value = DEFAULT_S3_ACCESS_KEY)]
#[arg(
long,
env = "P_S3_ACCESS_KEY",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_ACCESS_KEY)
)]
pub s3_access_key_id: String,

/// The secret key for AWS S3 or compatible object storage platform
#[arg(long, env = "P_S3_SECRET_KEY", default_value = DEFAULT_S3_SECRET_KEY)]
#[arg(
long,
env = "P_S3_SECRET_KEY",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_SECRET_KEY)
)]
pub s3_secret_key: String,

/// The region for AWS S3 or compatible object storage platform
#[arg(long, env = "P_S3_REGION", default_value = DEFAULT_S3_REGION)]
pub s3_default_region: String,
#[arg(
long,
env = "P_S3_REGION",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_REGION)
)]
pub s3_region: String,

/// The AWS S3 or compatible object storage bucket to be used for storage
#[arg(long, env = "P_S3_BUCKET", default_value = DEFAULT_S3_BUCKET)]
#[arg(
long,
env = "P_S3_BUCKET",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_BUCKET)
)]
pub s3_bucket_name: String,
}

Expand All @@ -126,28 +143,6 @@ impl StorageOpt for S3Config {
fn endpoint_url(&self) -> &str {
&self.s3_endpoint_url
}

fn is_default_url(&self) -> bool {
self.s3_endpoint_url == DEFAULT_S3_URL
}

fn warning(&self) {
if self.is_default_url() {
eprintln!(
"
{}
{}",
"Parseable server is using default object storage backend with public access."
.to_string()
.red(),
format!(
"Setup your object storage backend with {} before storing production logs.",
S3_URL_ENV_VAR
)
.red()
)
}
}
}

struct S3Options {
Expand All @@ -160,7 +155,7 @@ impl S3Options {
fn new() -> Self {
let uri = S3_CONFIG.s3_endpoint_url.parse::<Uri>().unwrap();
let endpoint = Endpoint::immutable(uri);
let region = Region::new(&S3_CONFIG.s3_default_region);
let region = Region::new(&S3_CONFIG.s3_region);
let creds = Credentials::new(
&S3_CONFIG.s3_access_key_id,
&S3_CONFIG.s3_secret_key,
Expand Down