Skip to content
Merged
2 changes: 1 addition & 1 deletion src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
config.options.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.options.hot_tier_storage_path {
Expand Down
72 changes: 71 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::Parser;
use std::path::PathBuf;
use std::{env, fs, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -385,4 +385,74 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// Path to staging directory, ensures that it exists or panics
pub fn staging_dir(&self) -> &PathBuf {
fs::create_dir_all(&self.local_staging_path)
.expect("Should be able to create dir if doesn't exist");

&self.local_staging_path
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}
}
2 changes: 1 addition & 1 deletion src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.staging_dir().display().to_string()
PARSEABLE.options.staging_dir().display().to_string()
};
let grpc_port = PARSEABLE.options.grpc_port;

Expand Down
24 changes: 12 additions & 12 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
cli::Options,
oidc::Claims,
parseable::PARSEABLE,
storage::PARSEABLE_ROOT_DIRECTORY,
utils::{get_ingestor_id, get_url},
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
utils::get_ingestor_id,
};

use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
Expand Down Expand Up @@ -213,18 +213,18 @@ impl IngestorMetadata {
}

/// Capture metadata information by either loading it from staging or starting fresh
pub fn load() -> Arc<Self> {
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
// all the files should be in the staging directory root
let entries = std::fs::read_dir(&PARSEABLE.options.local_staging_path)
.expect("Couldn't read from file");
let url = get_url();
let entries =
std::fs::read_dir(&options.local_staging_path).expect("Couldn't read from file");
let url = options.get_url();
let port = url.port().unwrap_or(80).to_string();
let url = url.to_string();
let Options {
username, password, ..
} = PARSEABLE.options.as_ref();
let staging_path = PARSEABLE.staging_dir();
let flight_port = PARSEABLE.options.flight_port.to_string();
} = options;
let staging_path = &options.local_staging_path;
let flight_port = options.flight_port.to_string();

for entry in entries {
// cause the staging directory will have only one file with ingestor in the name
Expand All @@ -250,7 +250,7 @@ impl IngestorMetadata {
if obj.get("flight_port").is_none() {
obj.insert(
"flight_port".to_owned(),
Value::String(PARSEABLE.options.flight_port.to_string()),
Value::String(options.flight_port.to_string()),
);
}

Expand Down Expand Up @@ -291,7 +291,7 @@ impl IngestorMetadata {
}
}

let storage = PARSEABLE.storage.get_object_store();
let storage = storage.get_object_store();
let meta = Self::new(
port,
url,
Expand Down Expand Up @@ -342,7 +342,7 @@ impl IngestorMetadata {
let bytes = Bytes::from(serde_json::to_vec(&json)?);

let resource: IngestorMetadata = serde_json::from_value(json)?;
resource.put_on_disk(PARSEABLE.staging_dir())?;
resource.put_on_disk(PARSEABLE.options.staging_dir())?;

PARSEABLE
.storage
Expand Down
4 changes: 2 additions & 2 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::handlers::http::base_path_without_preceding_slash;
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::modal::IngestorMetadata;
use crate::utils::get_url;
use crate::parseable::PARSEABLE;
use crate::HTTP_CLIENT;
use actix_web::http::header;
use chrono::NaiveDateTime;
Expand Down Expand Up @@ -61,7 +61,7 @@ struct StorageMetrics {

impl Default for Metrics {
fn default() -> Self {
let url = get_url();
let url = PARSEABLE.options.get_url();
let address = format!(
"http://{}:{}",
url.domain()
Expand Down
5 changes: 3 additions & 2 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
}

pub fn get_staging_metadata(config: &Parseable) -> anyhow::Result<Option<serde_json::Value>> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.staging_dir());
let path =
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.options.staging_dir());
let bytes = match std::fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
Expand All @@ -351,7 +352,7 @@ pub fn put_staging_metadata(
config: &Parseable,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
let path = config.staging_dir().join(".parseable.json");
let path = config.options.staging_dir().join(".parseable.json");
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
Expand Down
6 changes: 1 addition & 5 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Parseable {
storage: Arc<dyn ObjectStorageProvider>,
) -> Self {
let ingestor_metadata = match &options.mode {
Mode::Ingest => Some(IngestorMetadata::load()),
Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())),
_ => None,
};
Parseable {
Expand Down Expand Up @@ -217,10 +217,6 @@ impl Parseable {
self.storage.clone()
}

pub fn staging_dir(&self) -> &PathBuf {
&self.options.local_staging_path
}

pub fn hot_tier_dir(&self) -> &Option<PathBuf> {
&self.options.hot_tier_storage_path
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
}

async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> {
if !Path::new(&PARSEABLE.staging_dir()).exists() {
if !Path::new(&PARSEABLE.options.staging_dir()).exists() {
return Ok(());
}

Expand Down
22 changes: 13 additions & 9 deletions src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for StorageMetadata {
Self {
version: CURRENT_STORAGE_METADATA_VERSION.to_string(),
mode: PARSEABLE.storage.name().to_owned(),
staging: PARSEABLE.staging_dir().to_path_buf(),
staging: PARSEABLE.options.staging_dir().to_path_buf(),
storage: PARSEABLE.storage.get_endpoint(),
deployment_id: uid::gen(),
server_mode: PARSEABLE.options.mode,
Expand Down Expand Up @@ -134,8 +134,8 @@ pub async fn resolve_parseable_metadata(
if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest {
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
} else {
create_dir_all(PARSEABLE.staging_dir())?;
metadata.staging = PARSEABLE.staging_dir().canonicalize()?;
create_dir_all(PARSEABLE.options.staging_dir())?;
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
// this flag is set to true so that metadata is copied to staging
overwrite_staging = true;
// overwrite remote in all and query mode
Expand All @@ -151,20 +151,20 @@ pub async fn resolve_parseable_metadata(
Mode::Query => {
overwrite_remote = true;
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.staging_dir().to_path_buf();
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.staging_dir().to_path_buf();
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
}
Ok(metadata)
}
}
EnvChange::CreateBoth => {
create_dir_all(PARSEABLE.staging_dir())?;
create_dir_all(PARSEABLE.options.staging_dir())?;
let metadata = StorageMetadata::default();
// new metadata needs to be set
// if mode is query or all then both staging and remote
Expand Down Expand Up @@ -237,7 +237,8 @@ pub enum EnvChange {
}

pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(PARSEABLE.staging_dir());
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME)
.to_path(PARSEABLE.options.staging_dir());
let bytes = match fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
Expand All @@ -259,8 +260,11 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec
pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> {
let mut staging_metadata = meta.clone();
staging_metadata.server_mode = PARSEABLE.options.mode;
staging_metadata.staging = PARSEABLE.staging_dir().to_path_buf();
let path = PARSEABLE.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
staging_metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
let path = PARSEABLE
.options
.staging_dir()
.join(PARSEABLE_METADATA_FILE_NAME);
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
Expand Down
69 changes: 0 additions & 69 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ pub mod uid;
pub mod update;

use crate::handlers::http::rbac::RBACError;
use crate::parseable::PARSEABLE;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use actix::extract_session_key_from_req;
use actix_web::HttpRequest;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
use regex::Regex;
use sha2::{Digest, Sha256};
use std::env;
use tracing::debug;
use url::Url;

/// Convert minutes to a slot range
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
Expand All @@ -55,72 +52,6 @@ pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
Some(format!("{block_start:02}-{block_end:02}"))
}

pub fn get_url() -> Url {
if PARSEABLE.options.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
PARSEABLE.options.get_scheme(),
PARSEABLE.options.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", PARSEABLE.options.address)
});
}

let ingestor_endpoint = &PARSEABLE.options.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = get_from_env(&var_hostname);

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", PARSEABLE.options.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = get_from_env(&var_port);

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", PARSEABLE.options.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}

/// util fuction to fetch value from an env var
fn get_from_env(var_to_fetch: &str) -> String {
env::var(var_to_fetch).unwrap_or_else(|_| "".to_string())
}

pub fn get_ingestor_id() -> String {
let now = Utc::now().to_rfc3339();
let id = get_hash(&now).to_string().split_at(15).0.to_string();
Expand Down
Loading