Skip to content
Merged
2 changes: 1 addition & 1 deletion src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
config.options.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.options.hot_tier_storage_path {
Expand Down
72 changes: 71 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::Parser;
use std::path::PathBuf;
use std::{env, fs, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -385,4 +385,74 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// Path to staging directory, ensures that it exists or panics
pub fn staging_dir(&self) -> &PathBuf {
fs::create_dir_all(&self.local_staging_path)
.expect("Should be able to create dir if doesn't exist");

&self.local_staging_path
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}
}
2 changes: 1 addition & 1 deletion src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.staging_dir().display().to_string()
PARSEABLE.options.staging_dir().display().to_string()
};
let grpc_port = PARSEABLE.options.grpc_port;

Expand Down
162 changes: 79 additions & 83 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use std::{path::Path, sync::Arc};
use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use base64::Engine;
use base64::{prelude::BASE64_STANDARD, Engine};
use bytes::Bytes;
use openid::Discovered;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Map, Value};
use ssl_acceptor::get_ssl_acceptor;
use tokio::sync::oneshot;
use tracing::{error, info, warn};
Expand All @@ -35,8 +35,8 @@ use crate::{
cli::Options,
oidc::Claims,
parseable::PARSEABLE,
storage::PARSEABLE_ROOT_DIRECTORY,
utils::{get_ingestor_id, get_url},
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
utils::get_ingestor_id,
};

use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
Expand Down Expand Up @@ -213,85 +213,71 @@ impl IngestorMetadata {
}

/// Capture metadata information by either loading it from staging or starting fresh
pub fn load() -> Arc<Self> {
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
// all the files should be in the staging directory root
let entries = std::fs::read_dir(&PARSEABLE.options.local_staging_path)
let entries = options
.staging_dir()
.read_dir()
.expect("Couldn't read from file");
let url = get_url();
let url = options.get_url();
let port = url.port().unwrap_or(80).to_string();
let url = url.to_string();
let Options {
username, password, ..
} = PARSEABLE.options.as_ref();
let staging_path = PARSEABLE.staging_dir();
let flight_port = PARSEABLE.options.flight_port.to_string();
} = options;
let staging_path = options.staging_dir();
let flight_port = options.flight_port.to_string();

for entry in entries {
// cause the staging directory will have only one file with ingestor in the name
// so the JSON Parse should not error unless the file is corrupted
let path = entry.expect("Should be a directory entry").path();
let flag = path
if !path
.file_name()
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.contains("ingestor");

if flag {
// get the ingestor metadata from staging
let text = std::fs::read(path).expect("File should be present");
let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON");

// migrate the staging meta
let obj = meta
.as_object_mut()
.expect("Could Not parse Ingestor Metadata Json");

if obj.get("flight_port").is_none() {
obj.insert(
"flight_port".to_owned(),
Value::String(PARSEABLE.options.flight_port.to_string()),
);
}

let mut meta: IngestorMetadata =
serde_json::from_value(meta).expect("Couldn't write to disk");

// compare url endpoint and port
if meta.domain_name != url {
info!(
"Domain Name was Updated. Old: {} New: {}",
meta.domain_name, url
);
meta.domain_name = url;
}

if meta.port != port {
info!("Port was Updated. Old: {} New: {}", meta.port, port);
meta.port = port;
}

let token =
base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));

let token = format!("Basic {}", token);

if meta.token != token {
// TODO: Update the message to be more informative with username and password
info!(
"Credentials were Updated. Old: {} New: {}",
meta.token, token
);
meta.token = token;
}

meta.put_on_disk(staging_path)
.expect("Couldn't write to disk");
return Arc::new(meta);
.and_then(|s| s.to_str())
.is_some_and(|s| s.contains("ingestor"))
{
continue;
}

// get the ingestor metadata from staging
let bytes = std::fs::read(path).expect("File should be present");
let mut meta =
Self::from_bytes(&bytes, options.flight_port).expect("Extracted ingestor metadata");

// compare url endpoint and port, update
if meta.domain_name != url {
info!(
"Domain Name was Updated. Old: {} New: {}",
meta.domain_name, url
);
meta.domain_name = url;
}

if meta.port != port {
info!("Port was Updated. Old: {} New: {}", meta.port, port);
meta.port = port;
}

let token = format!(
"Basic {}",
BASE64_STANDARD.encode(format!("{username}:{password}"))
);
if meta.token != token {
// TODO: Update the message to be more informative with username and password
warn!(
"Credentials were Updated. Tokens updated; Old: {} New: {}",
meta.token, token
);
meta.token = token;
}
meta.put_on_disk(staging_path)
.expect("Couldn't write to disk");

return Arc::new(meta);
}

let storage = PARSEABLE.storage.get_object_store();
let storage = storage.get_object_store();
let meta = Self::new(
port,
url,
Expand Down Expand Up @@ -319,6 +305,15 @@ impl IngestorMetadata {
])
}

/// Updates json with `flight_port` field if not already present
fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result<Self> {
let mut json: Map<String, Value> = serde_json::from_slice(bytes)?;
json.entry("flight_port")
.or_insert_with(|| Value::String(flight_port.to_string()));

Ok(serde_json::from_value(Value::Object(json))?)
}

pub async fn migrate(&self) -> anyhow::Result<Option<IngestorMetadata>> {
let imp = self.file_path();
let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await {
Expand All @@ -327,22 +322,11 @@ impl IngestorMetadata {
return Ok(None);
}
};
let mut json = serde_json::from_slice::<Value>(&bytes)?;
let meta = json
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?;
let fp = meta.get("flight_port");

if fp.is_none() {
meta.insert(
"flight_port".to_owned(),
Value::String(PARSEABLE.options.flight_port.to_string()),
);
}
let bytes = Bytes::from(serde_json::to_vec(&json)?);

let resource: IngestorMetadata = serde_json::from_value(json)?;
resource.put_on_disk(PARSEABLE.staging_dir())?;
let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?;
let bytes = Bytes::from(serde_json::to_vec(&resource)?);

resource.put_on_disk(PARSEABLE.options.staging_dir())?;

PARSEABLE
.storage
Expand Down Expand Up @@ -394,6 +378,18 @@ mod test {
assert_eq!(rhs, lhs);
}

#[test]
fn from_bytes_with_port() {
let meta = IngestorMetadata::from_bytes(br#"{"version": "", "port": "", "domain_name": "", "bucket_name": "", "token": "", "ingestor_id": "", "flight_port": ""}"#, 10).expect("Deserializable");
assert_eq!(meta.flight_port, "");
}

#[test]
fn from_bytes_without_port() {
let meta = IngestorMetadata::from_bytes(br#"{"version": "", "port": "", "domain_name": "", "bucket_name": "", "token": "", "ingestor_id": ""}"#, 10).expect("Deserializable");
assert_eq!(meta.flight_port, "10");
}

#[rstest]
fn test_serialize_resource() {
let im = IngestorMetadata::new(
Expand Down
4 changes: 2 additions & 2 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::handlers::http::base_path_without_preceding_slash;
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::modal::IngestorMetadata;
use crate::utils::get_url;
use crate::parseable::PARSEABLE;
use crate::HTTP_CLIENT;
use actix_web::http::header;
use chrono::NaiveDateTime;
Expand Down Expand Up @@ -61,7 +61,7 @@ struct StorageMetrics {

impl Default for Metrics {
fn default() -> Self {
let url = get_url();
let url = PARSEABLE.options.get_url();
let address = format!(
"http://{}:{}",
url.domain()
Expand Down
5 changes: 3 additions & 2 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
}

pub fn get_staging_metadata(config: &Parseable) -> anyhow::Result<Option<serde_json::Value>> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.staging_dir());
let path =
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.options.staging_dir());
let bytes = match std::fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
Expand All @@ -351,7 +352,7 @@ pub fn put_staging_metadata(
config: &Parseable,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
let path = config.staging_dir().join(".parseable.json");
let path = config.options.staging_dir().join(".parseable.json");
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
Expand Down
8 changes: 2 additions & 6 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub const STREAM_EXISTS: &str = "Stream exists";
/// Shared state of the Parseable server.
pub static PARSEABLE: Lazy<Parseable> = Lazy::new(|| match Cli::parse().storage {
StorageOptions::Local(args) => {
if args.options.local_staging_path == args.storage.root {
if args.options.staging_dir() == &args.storage.root {
clap::Error::raw(
ErrorKind::ValueValidation,
"Cannot use same path for storage and staging",
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Parseable {
storage: Arc<dyn ObjectStorageProvider>,
) -> Self {
let ingestor_metadata = match &options.mode {
Mode::Ingest => Some(IngestorMetadata::load()),
Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())),
_ => None,
};
Parseable {
Expand Down Expand Up @@ -217,10 +217,6 @@ impl Parseable {
self.storage.clone()
}

pub fn staging_dir(&self) -> &PathBuf {
&self.options.local_staging_path
}

pub fn hot_tier_dir(&self) -> &Option<PathBuf> {
&self.options.hot_tier_storage_path
}
Expand Down
Loading
Loading