Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,52 @@
use crossterm::style::Stylize;
use sysinfo::{System, SystemExt};

use crate::{option::Config, utils::capitalize_ascii};

pub fn print(config: &Config) {
let scheme = config.parseable.get_scheme();
status_info(config, &scheme);
version::print();
storage_info(config);
system_info();
println!();
}

fn status_info(config: &Config, scheme: &str) {
let url = format!("{}://{}", scheme, config.parseable.address).underlined();
eprintln!(
"
{}
{}
{}",
format!("Parseable server started at: {}", url).bold(),
format!("Username: {}", config.parseable.username).bold(),
format!("Password: {}", config.parseable.password).bold(),
);

if config.is_default_creds() {
warning_line();
eprintln!(
"
{}",
"Using default credentials for Parseable server".red()
)
}
}

fn storage_info(config: &Config) {
eprintln!(
"
{}
Local Staging Path: {}
{} Storage: {}",
"Storage:".to_string().blue().bold(),
config.staging_dir().to_string_lossy(),
capitalize_ascii(config.storage_name),
config.storage().get_endpoint(),
)
}

pub fn system_info() {
let system = System::new_all();
eprintln!(
Expand All @@ -36,7 +82,6 @@ pub fn system_info() {
)
}

#[allow(dead_code)]
pub fn warning_line() {
eprint!(
"
Expand Down Expand Up @@ -81,13 +126,10 @@ pub mod version {
current_version
);

// check for latest release, if it cannot be fetched then print error as warn and return
let latest_release = match update::get_latest() {
Ok(latest_release) => latest_release,
Err(e) => {
log::warn!("{}", e);
return;
}
// check for latest release
let Ok(latest_release) = update::get_latest() else {
eprintln!();
return
};

if latest_release.version > current_version {
Expand All @@ -109,7 +151,7 @@ pub mod version {
}
}
ParseableVersion::Prerelease(current_prerelease) => {
eprint!(
eprintln!(
"
{} {} ",
"Current Version:".to_string().blue().bold(),
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const API_VERSION: &str = "v1";
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
CONFIG.print();
banner::print(&CONFIG);
CONFIG.validate();
let storage = CONFIG.storage().get_object_store();
CONFIG.validate_storage(&*storage).await;
Expand Down
79 changes: 37 additions & 42 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*
*/

use clap::error::ErrorKind;
use clap::{command, value_parser, Arg, Args, Command, FromArgMatches};
use crossterm::style::Stylize;

use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::banner;
use crate::storage::{
FSConfig, ObjectStorage, ObjectStorageError, ObjectStorageProvider, S3Config,
LOCAL_SYNC_INTERVAL,
};
use crate::utils::capitalize_ascii;

lazy_static::lazy_static! {
#[derive(Debug)]
Expand Down Expand Up @@ -54,6 +53,15 @@ impl Config {
Err(err) => err.exit(),
};

if server.local_staging_path == storage.root {
parseable_cli_command()
.error(
ErrorKind::ValueValidation,
"Cannot use same path for storage and staging",
)
.exit()
}

Config {
parseable: server,
storage: Arc::new(storage),
Expand All @@ -80,15 +88,6 @@ impl Config {
}
}

pub fn print(&self) {
let scheme = CONFIG.parseable.get_scheme();
self.status_info(&scheme);
banner::version::print();
self.storage_info();
banner::system_info();
println!();
}

pub fn validate(&self) {
if CONFIG.parseable.upload_interval < LOCAL_SYNC_INTERVAL {
panic!("object storage upload_interval (P_STORAGE_UPLOAD_INTERVAL) must be 60 seconds or more");
Expand All @@ -113,39 +112,18 @@ impl Config {
}
}

fn status_info(&self, scheme: &str) {
let url = format!("{}://{}", scheme, self.parseable.address).underlined();
eprintln!(
"
{}
{}
{}",
format!("Parseable server started at: {}", url).bold(),
format!("Username: {}", self.parseable.username).bold(),
format!("Password: {}", self.parseable.password).bold(),
)
}

fn storage_info(&self) {
eprintln!(
"
{}
Local Staging Path: {}
{} Storage: {}",
"Storage:".to_string().blue().bold(),
self.staging_dir().to_string_lossy(),
capitalize_ascii(self.storage_name),
self.storage().get_endpoint(),
)
}

pub fn storage(&self) -> Arc<dyn ObjectStorageProvider + Send + Sync> {
self.storage.clone()
}

pub fn staging_dir(&self) -> &Path {
&self.parseable.local_staging_path
}

pub fn is_default_creds(&self) -> bool {
self.parseable.username == Server::DEFAULT_USERNAME
&& self.parseable.password == Server::DEFAULT_PASSWORD
}
}

impl Default for Config {
Expand All @@ -160,10 +138,10 @@ fn parseable_cli_command() -> Command {

let local = local
.mut_arg(Server::USERNAME, |arg| {
arg.required(false).default_value("admin")
arg.required(false).default_value(Server::DEFAULT_USERNAME)
})
.mut_arg(Server::PASSWORD, |arg| {
arg.required(false).default_value("admin")
arg.required(false).default_value(Server::DEFAULT_PASSWORD)
});

let s3 = Server::get_clap_command("--s3-store");
Expand Down Expand Up @@ -259,6 +237,8 @@ impl Server {
pub const UPLOAD_INTERVAL: &str = "upload-interval";
pub const USERNAME: &str = "username";
pub const PASSWORD: &str = "password";
pub const DEFAULT_USERNAME: &str = "admin";
pub const DEFAULT_PASSWORD: &str = "admin";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
Expand Down Expand Up @@ -305,7 +285,7 @@ impl Server {
.env("P_STAGING_DIR")
.value_name("DIR")
.default_value("./staging")
.value_parser(value_parser!(PathBuf))
.value_parser(validation::canonicalize_path)
.help("The local staging path is used as a temporary landing point for incoming events and local cache")
.next_line_help(true),
)
Expand Down Expand Up @@ -339,7 +319,11 @@ impl Server {
}

pub mod validation {
use std::{net::ToSocketAddrs, path::PathBuf};
use std::{
fs::{canonicalize, create_dir_all},
net::ToSocketAddrs,
path::PathBuf,
};

pub fn file_path(s: &str) -> Result<PathBuf, String> {
if s.is_empty() {
Expand All @@ -355,6 +339,17 @@ pub mod validation {
Ok(path)
}

pub fn canonicalize_path(s: &str) -> Result<PathBuf, String> {
let path = PathBuf::from(s);

create_dir_all(&path)
.map_err(|err| err.to_string())
.and_then(|_| {
canonicalize(&path)
.map_err(|_| "Cannot use the path provided as an absolute path".to_string())
})
}

pub fn socket_addr(s: &str) -> Result<String, String> {
s.to_socket_addrs()
.is_ok()
Expand Down
7 changes: 4 additions & 3 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use relative_path::RelativePath;
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

use crate::query::Query;
use crate::{option::validation, query::Query};

use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};

Expand All @@ -53,9 +53,10 @@ pub struct FSConfig {
#[arg(
env = "P_FS_PATH",
value_name = "filesystem path",
default_value = "./data"
default_value = "./data",
value_parser = validation::canonicalize_path
)]
root: PathBuf,
pub root: PathBuf,
}

impl ObjectStorageProvider for FSConfig {
Expand Down