From 7d9b006d5aee5934322213ba71d13c626aa63beb Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 19 Dec 2022 17:06:26 +0530 Subject: [PATCH 1/7] Redefine CLI subcommands Subcommands are moved to top level of cli and can be specified with `--s3` and `--drive`. --- server/src/event.rs | 2 +- server/src/handlers/event.rs | 1 - server/src/handlers/logstream.rs | 2 +- server/src/handlers/mod.rs | 2 +- server/src/main.rs | 2 - server/src/option.rs | 86 ++++++++++++++++---------------- server/src/query.rs | 5 +- 7 files changed, 47 insertions(+), 53 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 661e8e133..6cbc1ee12 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -36,7 +36,7 @@ use std::sync::RwLock; use crate::metadata; use crate::metadata::LOCK_EXPECT; use crate::option::CONFIG; -use crate::storage::{ObjectStorageProvider, StorageDir}; +use crate::storage::StorageDir; use self::error::{EventError, StreamWriterError}; diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 8346247e1..b59a17dcd 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -25,7 +25,6 @@ use crate::event; use crate::option::CONFIG; use crate::query::Query; use crate::response::QueryResponse; -use crate::storage::ObjectStorageProvider; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::{self, flatten_json_body, merge}; diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index cebf77865..49125250e 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -25,7 +25,7 @@ use serde_json::Value; use crate::alerts::Alerts; use crate::option::CONFIG; -use crate::storage::{ObjectStorageProvider, StorageDir}; +use crate::storage::StorageDir; use crate::{event, response}; use crate::{metadata, validator}; diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index 7d2127f0e..ba13de982 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -22,7 +22,7 @@ pub mod logstream; use actix_web::http::StatusCode; use actix_web::HttpResponse; -use crate::{option::CONFIG, storage::ObjectStorageProvider}; +use crate::option::CONFIG; pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) diff --git a/server/src/main.rs b/server/src/main.rs index 40e47ed4f..b80cfe875 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -54,8 +54,6 @@ mod validator; use option::CONFIG; -use crate::storage::ObjectStorageProvider; - // Global configurations const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000; const API_BASE_PATH: &str = "/api"; diff --git a/server/src/option.rs b/server/src/option.rs index 067c6417c..a9ddd0d44 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -40,17 +40,22 @@ pub const DEFAULT_PASSWORD: &str = "parseable"; pub struct Config { pub parseable: Server, + storage: Arc, } impl Config { fn new() -> Self { - let Cli::Server(args) = match Cli::try_parse() { - Ok(s) => s, - Err(e) => { - e.exit(); - } - }; - Config { parseable: args } + let cli = Cli::parse(); + match cli.command { + SubCmd::ServerS3 { server, storage } => Config { + parseable: server, + storage: Arc::new(storage), + }, + SubCmd::ServerDrive { server, storage } => Config { + parseable: server, + storage: Arc::new(storage), + }, + } } pub fn print(&self) { @@ -108,7 +113,7 @@ impl Config { Object Storage: {}", "Storage:".to_string().blue().bold(), self.parseable.local_disk_path.to_string_lossy(), - self.parseable.object_store.get_endpoint(), + self.storage().get_endpoint(), ) } @@ -129,8 +134,14 @@ impl Config { self.parseable.demo } - pub fn storage(&self) -> &impl ObjectStorageProvider { - &self.parseable.object_store + pub fn storage(&self) -> Arc { + self.storage.clone() + } +} + +impl Default for Config { + fn default() -> Self { + Self::new() } } @@ -141,8 +152,27 @@ impl Config { about = "Parseable is a log storage and observability platform.", version )] -enum Cli { - Server(Server), +struct Cli { + #[command(subcommand)] + command: SubCmd, +} + +#[derive(Subcommand, Clone)] +enum SubCmd { + #[command(name = "--s3")] + ServerS3 { + #[command(flatten)] + server: Server, + #[command(flatten)] + storage: S3Config, + }, + #[command(name = "--drive")] + ServerDrive { + #[command(flatten)] + server: Server, + #[command(flatten)] + storage: FSConfig, + }, } #[derive(clap::Args, Debug, Clone)] @@ -214,43 +244,11 @@ pub struct Server { )] pub password: String, - #[command(subcommand)] - pub object_store: ObjectStore, - /// Run Parseable in demo mode with default credentials and open object store #[arg(short, long, exclusive = true)] pub demo: bool, } -#[derive(Debug, Clone, Subcommand)] -pub enum ObjectStore { - Drive(FSConfig), - S3(S3Config), -} - -impl ObjectStorageProvider for ObjectStore { - fn get_datafusion_runtime(&self) -> Arc { - match self { - ObjectStore::Drive(x) => x.get_datafusion_runtime(), - ObjectStore::S3(x) => x.get_datafusion_runtime(), - } - } - - fn get_object_store(&self) -> Arc { - match self { - ObjectStore::Drive(x) => x.get_object_store(), - ObjectStore::S3(x) => x.get_object_store(), - } - } - - fn get_endpoint(&self) -> String { - match self { - ObjectStore::Drive(x) => x.get_endpoint(), - ObjectStore::S3(x) => x.get_endpoint(), - } - } -} - impl Server { pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_disk_path.join(stream_name) diff --git a/server/src/query.rs b/server/src/query.rs index e6a092fc7..c4521c46b 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -30,10 +30,9 @@ use std::path::PathBuf; use std::sync::Arc; use crate::option::CONFIG; -use crate::storage::ObjectStorage; use crate::storage::ObjectStorageError; use crate::storage::StorageDir; -use crate::storage::{self, ObjectStorageProvider}; +use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; use crate::utils::TimePeriod; use crate::validator; @@ -68,7 +67,7 @@ impl Query { /// Return prefixes, each per day/hour/minutes as necessary pub fn get_prefixes(&self) -> Vec { - TimePeriod::new(self.start, self.end, storage::OBJECT_STORE_DATA_GRANULARITY) + TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY) .generate_prefixes(&self.stream_name) } From 666ffca6136eebba4717f209b29aab26ff171335 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 19 Dec 2022 21:23:30 +0530 Subject: [PATCH 2/7] Change local staging env var --- server/src/option.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/option.rs b/server/src/option.rs index a9ddd0d44..3072b7e2f 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -210,7 +210,7 @@ pub struct Server { /// from object storage backend #[arg( long, - env = "P_LOCAL_STORAGE", + env = "P_LOCAL_STAGE_PATH", default_value = "./data", value_name = "path" )] From b3fa075cb92bb4cbbd34c73b0bc7933daf2540ce Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 19 Dec 2022 21:24:41 +0530 Subject: [PATCH 3/7] Remove defaults and demo --- server/src/option.rs | 8 -------- server/src/storage/s3.rs | 44 +++++----------------------------------- 2 files changed, 5 insertions(+), 47 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 3072b7e2f..d8e4e787c 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -35,8 +35,6 @@ lazy_static::lazy_static! { pub const USERNAME_ENV: &str = "P_USERNAME"; pub const PASSWORD_ENV: &str = "P_PASSWORD"; -pub const DEFAULT_USERNAME: &str = "parseable"; -pub const DEFAULT_PASSWORD: &str = "parseable"; pub struct Config { pub parseable: Server, @@ -231,7 +229,6 @@ pub struct Server { long, env = USERNAME_ENV, value_name = "username", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_USERNAME) )] pub username: String, @@ -240,13 +237,8 @@ pub struct Server { long, env = PASSWORD_ENV, value_name = "password", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_PASSWORD) )] pub password: String, - - /// Run Parseable in demo mode with default credentials and open object store - #[arg(short, long, exclusive = true)] - pub demo: bool, } impl Server { diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d2fff743f..1c2ce6e6e 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -51,61 +51,27 @@ use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; use super::ObjectStorageProvider; -// Default object storage currently is DO Spaces bucket -// Any user who starts the Parseable server with default configuration -// will point to this bucket and will see any data present on this bucket -const DEFAULT_S3_URL: &str = "https://minio.parseable.io:9000"; -const DEFAULT_S3_REGION: &str = "us-east-1"; -const DEFAULT_S3_BUCKET: &str = "parseable"; -const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; -const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; - #[derive(Debug, Clone, clap::Args)] #[command(name = "S3 config", about = "configuration for AWS S3 SDK")] pub struct S3Config { /// The endpoint to AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_URL", - value_name = "url", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_URL) - )] + #[arg(long, env = "P_S3_URL", value_name = "url")] pub s3_endpoint_url: String, /// The access key for AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_ACCESS_KEY", - value_name = "access-key", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_ACCESS_KEY) - )] + #[arg(long, env = "P_S3_ACCESS_KEY", value_name = "access-key")] pub s3_access_key_id: String, /// The secret key for AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_SECRET_KEY", - value_name = "secret-key", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_SECRET_KEY) - )] + #[arg(long, env = "P_S3_SECRET_KEY", value_name = "secret-key")] pub s3_secret_key: String, /// The region for AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_REGION", - value_name = "region", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_REGION) - )] + #[arg(long, env = "P_S3_REGION", value_name = "region")] pub s3_region: String, /// The AWS S3 or compatible object storage bucket to be used for storage - #[arg( - long, - env = "P_S3_BUCKET", - value_name = "bucket-name", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_BUCKET) - )] + #[arg(long, env = "P_S3_BUCKET", value_name = "bucket-name")] pub s3_bucket_name: String, /// Set client to send content_md5 header on every put request From 6547d5a7fa37fe40ef101392c16e3f8fe7ccd0c9 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 19 Dec 2022 21:26:39 +0530 Subject: [PATCH 4/7] Remove demo --- server/src/banner.rs | 1 + server/src/option.rs | 19 ------------------- server/src/storage/s3.rs | 1 - 3 files changed, 1 insertion(+), 20 deletions(-) diff --git a/server/src/banner.rs b/server/src/banner.rs index 5345dca85..d0f2fcee6 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -36,6 +36,7 @@ pub fn system_info() { ) } +#[allow(dead_code)] pub fn warning_line() { eprint!( " diff --git a/server/src/option.rs b/server/src/option.rs index d8e4e787c..e7bbeb242 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -16,7 +16,6 @@ * */ -use clap::builder::ArgPredicate; use clap::{Parser, Subcommand}; use crossterm::style::Stylize; use std::path::PathBuf; @@ -60,7 +59,6 @@ impl Config { let scheme = CONFIG.parseable.get_scheme(); self.status_info(&scheme); banner::version::print(); - self.demo(); self.storage_info(); banner::system_info(); println!(); @@ -115,23 +113,6 @@ impl Config { ) } - fn demo(&self) { - if self.is_demo() { - banner::warning_line(); - eprintln!( - " - {}", - "Parseable is in demo mode with default credentials and open object store. Please use this for demo purposes only." - .to_string() - .red(), - ) - } - } - - fn is_demo(&self) -> bool { - self.parseable.demo - } - pub fn storage(&self) -> Arc { self.storage.clone() } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 1c2ce6e6e..6d57c057f 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -26,7 +26,6 @@ use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; use aws_smithy_async::rt::sleep::default_async_sleep; use base64::encode; use bytes::Bytes; -use clap::builder::ArgPredicate; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ From 2c6ed5a744455cec60878a933fea27bb8d75bec9 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 19 Dec 2022 21:31:13 +0530 Subject: [PATCH 5/7] Change staging dir env --- server/src/option.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/option.rs b/server/src/option.rs index e7bbeb242..e9cdecfb6 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -189,7 +189,7 @@ pub struct Server { /// from object storage backend #[arg( long, - env = "P_LOCAL_STAGE_PATH", + env = "P_STAGING_DIR", default_value = "./data", value_name = "path" )] From 6f8a4bb0ef2ccd5da43ca396f6a04444e06deddf Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 20 Dec 2022 12:26:37 +0530 Subject: [PATCH 6/7] Fix --- server/src/option.rs | 25 ++++++++++++---------- server/src/storage/object_storage.rs | 2 +- server/src/storage/s3.rs | 32 ++++++++++++++-------------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index e9cdecfb6..b41ad3dc3 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -18,7 +18,7 @@ use clap::{Parser, Subcommand}; use crossterm::style::Stylize; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::banner; @@ -108,7 +108,7 @@ impl Config { Local Data Path: {} Object Storage: {}", "Storage:".to_string().blue().bold(), - self.parseable.local_disk_path.to_string_lossy(), + self.staging_dir().to_string_lossy(), self.storage().get_endpoint(), ) } @@ -116,6 +116,10 @@ impl Config { pub fn storage(&self) -> Arc { self.storage.clone() } + + pub fn staging_dir(&self) -> &Path { + &self.parseable.local_staging_path + } } impl Default for Config { @@ -184,19 +188,18 @@ pub struct Server { )] pub address: String, - /// The local storage path is used as temporary landing point - /// for incoming events and local cache while querying data pulled - /// from object storage backend + /// The local staging path is used as a temporary landing point + /// for incoming events and local cache #[arg( long, env = "P_STAGING_DIR", default_value = "./data", value_name = "path" )] - pub local_disk_path: PathBuf, + pub local_staging_path: PathBuf, - /// Optional interval after which server would upload uncommited data to - /// remote object storage platform. Defaults to 1min. + /// Interval in seconds after which uncommited data would be + /// uploaded to the storage platform. #[arg( long, env = "P_STORAGE_UPLOAD_INTERVAL", @@ -205,7 +208,7 @@ pub struct Server { )] pub upload_interval: u64, - /// Optional username to enable basic auth on the server + /// Username for the basic authentication on the server #[arg( long, env = USERNAME_ENV, @@ -213,7 +216,7 @@ pub struct Server { )] pub username: String, - /// Optional password to enable basic auth on the server + /// Password for the basic authentication on the server #[arg( long, env = PASSWORD_ENV, @@ -224,7 +227,7 @@ pub struct Server { impl Server { pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { - self.local_disk_path.join(stream_name) + self.local_staging_path.join(stream_name) } pub fn get_scheme(&self) -> String { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 2f852210f..056e5d447 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -145,7 +145,7 @@ pub trait ObjectStorage: Sync + 'static { } async fn sync(&self) -> Result<(), MoveDataError> { - if !Path::new(&CONFIG.parseable.local_disk_path).exists() { + if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 6d57c057f..2a430287d 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -55,23 +55,23 @@ use super::ObjectStorageProvider; pub struct S3Config { /// The endpoint to AWS S3 or compatible object storage platform #[arg(long, env = "P_S3_URL", value_name = "url")] - pub s3_endpoint_url: String, + pub endpoint_url: String, /// The access key for AWS S3 or compatible object storage platform #[arg(long, env = "P_S3_ACCESS_KEY", value_name = "access-key")] - pub s3_access_key_id: String, + pub access_key_id: String, /// The secret key for AWS S3 or compatible object storage platform #[arg(long, env = "P_S3_SECRET_KEY", value_name = "secret-key")] - pub s3_secret_key: String, + pub secret_key: String, /// The region for AWS S3 or compatible object storage platform #[arg(long, env = "P_S3_REGION", value_name = "region")] - pub s3_region: String, + pub region: String, /// The AWS S3 or compatible object storage bucket to be used for storage #[arg(long, env = "P_S3_BUCKET", value_name = "bucket-name")] - pub s3_bucket_name: String, + pub bucket_name: String, /// Set client to send content_md5 header on every put request #[arg( @@ -86,11 +86,11 @@ pub struct S3Config { impl ObjectStorageProvider for S3Config { fn get_datafusion_runtime(&self) -> Arc { let s3 = AmazonS3Builder::new() - .with_region(&self.s3_region) - .with_endpoint(&self.s3_endpoint_url) - .with_bucket_name(&self.s3_bucket_name) - .with_access_key_id(&self.s3_access_key_id) - .with_secret_access_key(&self.s3_secret_key) + .with_region(&self.region) + .with_endpoint(&self.endpoint_url) + .with_bucket_name(&self.bucket_name) + .with_access_key_id(&self.access_key_id) + .with_secret_access_key(&self.secret_key) // allow http for local instances .with_allow_http(true) .build() @@ -100,7 +100,7 @@ impl ObjectStorageProvider for S3Config { let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); let object_store_registry = ObjectStoreRegistry::new(); - object_store_registry.register_store("s3", &self.s3_bucket_name, Arc::new(s3)); + object_store_registry.register_store("s3", &self.bucket_name, Arc::new(s3)); let config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); @@ -111,10 +111,10 @@ impl ObjectStorageProvider for S3Config { } fn get_object_store(&self) -> Arc { - let uri = self.s3_endpoint_url.parse::().unwrap(); + let uri = self.endpoint_url.parse::().unwrap(); let endpoint = Endpoint::immutable(uri); - let region = Region::new(self.s3_region.clone()); - let creds = Credentials::new(&self.s3_access_key_id, &self.s3_secret_key, None, None, ""); + let region = Region::new(self.region.clone()); + let creds = Credentials::new(&self.access_key_id, &self.secret_key, None, None, ""); let config = aws_sdk_s3::Config::builder() .region(region) @@ -128,13 +128,13 @@ impl ObjectStorageProvider for S3Config { Arc::new(S3 { client, - bucket: self.s3_bucket_name.clone(), + bucket: self.bucket_name.clone(), set_content_md5: self.content_md5, }) } fn get_endpoint(&self) -> String { - format!("{}/{}", self.s3_endpoint_url, self.s3_bucket_name) + format!("{}/{}", self.endpoint_url, self.bucket_name) } } From 81a316edfc693c1609d3340768bc114dd9048256 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 20 Dec 2022 12:49:21 +0530 Subject: [PATCH 7/7] Remove flag --- server/src/storage/localfs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 18852cc2d..34a75ac0b 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -46,7 +46,7 @@ use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider} about = "configuration for using local filesystem for storage" )] pub struct FSConfig { - #[arg(long, env = "P_FS_PATH", value_name = "path")] + #[arg(env = "P_FS_PATH", value_name = "filesystem path")] root: PathBuf, }