Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub fn system_info() {
)
}

#[allow(dead_code)]
pub fn warning_line() {
eprint!(
"
Expand Down
2 changes: 1 addition & 1 deletion server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::sync::RwLock;
use crate::metadata;
use crate::metadata::LOCK_EXPECT;
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::storage::StorageDir;

use self::error::{EventError, StreamWriterError};

Expand Down
1 change: 0 additions & 1 deletion server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::event;
use crate::option::CONFIG;
use crate::query::Query;
use crate::response::QueryResponse;
use crate::storage::ObjectStorageProvider;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use crate::utils::{self, flatten_json_body, merge};

Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use serde_json::Value;

use crate::alerts::Alerts;
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::storage::StorageDir;
use crate::{event, response};
use crate::{metadata, validator};

Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod logstream;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;

use crate::{option::CONFIG, storage::ObjectStorageProvider};
use crate::option::CONFIG;

pub async fn liveness() -> HttpResponse {
HttpResponse::new(StatusCode::OK)
Expand Down
2 changes: 0 additions & 2 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ mod validator;

use option::CONFIG;

use crate::storage::ObjectStorageProvider;

// Global configurations
const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000;
const API_BASE_PATH: &str = "/api";
Expand Down
132 changes: 53 additions & 79 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*
*/

use clap::builder::ArgPredicate;
use clap::{Parser, Subcommand};
use crossterm::style::Stylize;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::banner;
Expand All @@ -35,29 +34,31 @@ lazy_static::lazy_static! {

pub const USERNAME_ENV: &str = "P_USERNAME";
pub const PASSWORD_ENV: &str = "P_PASSWORD";
pub const DEFAULT_USERNAME: &str = "parseable";
pub const DEFAULT_PASSWORD: &str = "parseable";

pub struct Config {
pub parseable: Server,
storage: Arc<dyn ObjectStorageProvider + Send + Sync>,
}

impl Config {
fn new() -> Self {
let Cli::Server(args) = match Cli::try_parse() {
Ok(s) => s,
Err(e) => {
e.exit();
}
};
Config { parseable: args }
let cli = Cli::parse();
match cli.command {
SubCmd::ServerS3 { server, storage } => Config {
parseable: server,
storage: Arc::new(storage),
},
SubCmd::ServerDrive { server, storage } => Config {
parseable: server,
storage: Arc::new(storage),
},
}
}

pub fn print(&self) {
let scheme = CONFIG.parseable.get_scheme();
self.status_info(&scheme);
banner::version::print();
self.demo();
self.storage_info();
banner::system_info();
println!();
Expand Down Expand Up @@ -107,30 +108,23 @@ impl Config {
Local Data Path: {}
Object Storage: {}",
"Storage:".to_string().blue().bold(),
self.parseable.local_disk_path.to_string_lossy(),
self.parseable.object_store.get_endpoint(),
self.staging_dir().to_string_lossy(),
self.storage().get_endpoint(),
)
}

fn demo(&self) {
if self.is_demo() {
banner::warning_line();
eprintln!(
"
{}",
"Parseable is in demo mode with default credentials and open object store. Please use this for demo purposes only."
.to_string()
.red(),
)
}
pub fn storage(&self) -> Arc<dyn ObjectStorageProvider + Send + Sync> {
self.storage.clone()
}

fn is_demo(&self) -> bool {
self.parseable.demo
pub fn staging_dir(&self) -> &Path {
&self.parseable.local_staging_path
}
}

pub fn storage(&self) -> &impl ObjectStorageProvider {
&self.parseable.object_store
impl Default for Config {
fn default() -> Self {
Self::new()
}
}

Expand All @@ -141,8 +135,27 @@ impl Config {
about = "Parseable is a log storage and observability platform.",
version
)]
enum Cli {
Server(Server),
struct Cli {
#[command(subcommand)]
command: SubCmd,
}

#[derive(Subcommand, Clone)]
enum SubCmd {
#[command(name = "--s3")]
ServerS3 {
#[command(flatten)]
server: Server,
#[command(flatten)]
storage: S3Config,
},
#[command(name = "--drive")]
ServerDrive {
#[command(flatten)]
server: Server,
#[command(flatten)]
storage: FSConfig,
},
}

#[derive(clap::Args, Debug, Clone)]
Expand Down Expand Up @@ -175,19 +188,18 @@ pub struct Server {
)]
pub address: String,

/// The local storage path is used as temporary landing point
/// for incoming events and local cache while querying data pulled
/// from object storage backend
/// The local staging path is used as a temporary landing point
/// for incoming events and local cache
#[arg(
long,
env = "P_LOCAL_STORAGE",
env = "P_STAGING_DIR",
default_value = "./data",
value_name = "path"
)]
pub local_disk_path: PathBuf,
pub local_staging_path: PathBuf,

/// Optional interval after which server would upload uncommited data to
/// remote object storage platform. Defaults to 1min.
/// Interval in seconds after which uncommited data would be
/// uploaded to the storage platform.
#[arg(
long,
env = "P_STORAGE_UPLOAD_INTERVAL",
Expand All @@ -196,64 +208,26 @@ pub struct Server {
)]
pub upload_interval: u64,

/// Optional username to enable basic auth on the server
/// Username for the basic authentication on the server
#[arg(
long,
env = USERNAME_ENV,
value_name = "username",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_USERNAME)
)]
pub username: String,

/// Optional password to enable basic auth on the server
/// Password for the basic authentication on the server
#[arg(
long,
env = PASSWORD_ENV,
value_name = "password",
default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_PASSWORD)
)]
pub password: String,

#[command(subcommand)]
pub object_store: ObjectStore,

/// Run Parseable in demo mode with default credentials and open object store
#[arg(short, long, exclusive = true)]
pub demo: bool,
}

#[derive(Debug, Clone, Subcommand)]
pub enum ObjectStore {
Drive(FSConfig),
S3(S3Config),
}

impl ObjectStorageProvider for ObjectStore {
fn get_datafusion_runtime(&self) -> Arc<datafusion::execution::runtime_env::RuntimeEnv> {
match self {
ObjectStore::Drive(x) => x.get_datafusion_runtime(),
ObjectStore::S3(x) => x.get_datafusion_runtime(),
}
}

fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {
match self {
ObjectStore::Drive(x) => x.get_object_store(),
ObjectStore::S3(x) => x.get_object_store(),
}
}

fn get_endpoint(&self) -> String {
match self {
ObjectStore::Drive(x) => x.get_endpoint(),
ObjectStore::S3(x) => x.get_endpoint(),
}
}
}

impl Server {
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_disk_path.join(stream_name)
self.local_staging_path.join(stream_name)
}

pub fn get_scheme(&self) -> String {
Expand Down
5 changes: 2 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ use std::path::PathBuf;
use std::sync::Arc;

use crate::option::CONFIG;
use crate::storage::ObjectStorage;
use crate::storage::ObjectStorageError;
use crate::storage::StorageDir;
use crate::storage::{self, ObjectStorageProvider};
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
use crate::utils::TimePeriod;
use crate::validator;

Expand Down Expand Up @@ -68,7 +67,7 @@ impl Query {

/// Return prefixes, each per day/hour/minutes as necessary
pub fn get_prefixes(&self) -> Vec<String> {
TimePeriod::new(self.start, self.end, storage::OBJECT_STORE_DATA_GRANULARITY)
TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY)
.generate_prefixes(&self.stream_name)
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}
about = "configuration for using local filesystem for storage"
)]
pub struct FSConfig {
#[arg(long, env = "P_FS_PATH", value_name = "path")]
#[arg(env = "P_FS_PATH", value_name = "filesystem path")]
root: PathBuf,
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub trait ObjectStorage: Sync + 'static {
}

async fn sync(&self) -> Result<(), MoveDataError> {
if !Path::new(&CONFIG.parseable.local_disk_path).exists() {
if !Path::new(&CONFIG.staging_dir()).exists() {
return Ok(());
}

Expand Down
Loading