diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 1d0270a88..3498c8183 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -118,7 +118,12 @@ pub async fn update_snapshot( let mut ch = false; for m in manifests.iter() { let s = get_address(); - let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE); + let p = format!( + "{}.{}.{}", + s.domain().unwrap(), + s.port().unwrap_or_default(), + MANIFEST_FILE + ); if m.manifest_path.contains(&p) { ch = true; } @@ -152,7 +157,12 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap_or_default(), + MANIFEST_FILE + ); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage @@ -186,7 +196,12 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap(), + MANIFEST_FILE + ); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) diff --git a/server/src/cli.rs b/server/src/cli.rs index 41e03b39f..2ad9899cd 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -88,7 +88,7 @@ pub struct Cli { pub mode: Mode, /// public address for the parseable server ingestor - pub ingestor_url: String, + pub ingestor_endpoint: String, } impl Cli { @@ -115,7 +115,7 @@ impl Cli { pub const ROW_GROUP_SIZE: &'static str = "row-group-size"; pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; pub const MODE: &'static str = "mode"; - pub const INGESTOR_URL: &'static str = "ingestor-url"; + pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; @@ -317,9 +317,9 @@ impl Cli { .help("Mode of operation"), ) .arg( - Arg::new(Self::INGESTOR_URL) - .long(Self::INGESTOR_URL) - .env("P_INGESTOR_URL") + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") .value_name("URL") .required(false) .help("URL to connect to this specific ingestor. Default is the address of the server.") @@ -367,8 +367,8 @@ impl FromArgMatches for Cli { .cloned() .expect("default value for address"); - self.ingestor_url = m - .get_one::(Self::INGESTOR_URL) + self.ingestor_endpoint = m + .get_one::(Self::INGESTOR_ENDPOINT) .cloned() .unwrap_or_else(String::default); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 0b565cee1..5309b76a3 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -182,7 +182,10 @@ impl IngestServer { let store = CONFIG.storage().get_object_store(); let sock = get_address(); - let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string()); + let path = ingestor_metadata_path( + sock.domain().unwrap().to_string(), + sock.port().unwrap_or_default().to_string(), + ); if store.get_object(&path).await.is_ok() { println!("ingestor metadata already exists"); @@ -191,13 +194,19 @@ impl IngestServer { let scheme = CONFIG.parseable.get_scheme(); let resource = IngestorMetadata::new( - sock.port().to_string(), + sock.port().unwrap_or_default().to_string(), CONFIG .parseable .domain_address .clone() .unwrap_or_else(|| { - Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap() + Url::parse(&format!( + "{}://{}:{}", + scheme, + sock.domain().unwrap(), + sock.port().unwrap_or_default() + )) + .unwrap() }) .to_string(), DEFAULT_VERSION.to_string(), diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 6fb99405d..37a0445bf 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -23,7 +23,11 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { let socket = get_address(); - let address = format!("http://{}:{}", socket.ip(), socket.port()); + let address = format!( + "http://{}:{}", + socket.domain().unwrap(), + socket.port().unwrap_or_default() + ); Metrics { address, parseable_events_ingested: 0.0, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index c3acec8fa..573b60bb4 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -541,8 +541,8 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { let addr = get_address(); let file_name = format!( ".ingestor.{}.{}{}", - addr.ip(), - addr.port(), + addr.domain().unwrap(), + addr.port().unwrap_or_default(), SCHEMA_FILE_NAME ); @@ -561,8 +561,8 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { let addr = get_address(); let file_name = format!( ".ingestor.{}.{}{}", - addr.ip(), - addr.port(), + addr.domain().unwrap(), + addr.port().unwrap_or_default(), STREAM_METADATA_FILE_NAME ); RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) @@ -589,7 +589,12 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] fn manifest_path(prefix: &str) -> RelativePathBuf { let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap_or_default(), + MANIFEST_FILE + ); RelativePathBuf::from_iter([prefix, &mainfest_file_name]) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index f2f42a901..b3637ce68 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -64,8 +64,8 @@ impl StorageDir { + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); let sock = get_address(); - let ip = sock.ip(); - let port = sock.port(); + let ip = sock.domain().unwrap(); + let port = sock.port().unwrap_or_default(); format!("{local_uri}{ip}.{port}.{extention}") } diff --git a/server/src/utils.rs b/server/src/utils.rs index d1cf6a155..987e41262 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -25,7 +25,9 @@ pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; use std::env; +#[allow(unused_imports)] use std::net::SocketAddr; +use url::Url; #[allow(dead_code)] pub fn hostname() -> Option { @@ -224,32 +226,43 @@ impl TimePeriod { } } -#[inline(always)] -pub fn get_address() -> SocketAddr { - if CONFIG.parseable.ingestor_url.is_empty() { - CONFIG.parseable.address.parse::().unwrap() - } else { - let addr_from_env = CONFIG - .parseable - .ingestor_url - .split(':') - .collect::>(); - - let mut hostname = addr_from_env[0].to_string(); - let mut port = addr_from_env[1].to_string(); - if hostname.starts_with('$') { - let var_hostname = hostname[1..].to_string(); - hostname = get_from_env(&var_hostname); - } - if port.starts_with('$') { - let var_port = port[1..].to_string(); - port = get_from_env(&var_port); - } - format!("{}:{}", hostname, port) - .parse::() - .unwrap() +pub fn get_address() -> Url { + if CONFIG.parseable.ingestor_endpoint.is_empty() { + return format!( + "{}://{}", + CONFIG.parseable.get_scheme(), + CONFIG.parseable.address + ) + .parse::() // if the value was improperly set, this will panic before hand + .unwrap(); + } + let addr_from_env = CONFIG + .parseable + .ingestor_endpoint + .split(':') + .collect::>(); + + let mut hostname = addr_from_env[0].to_string(); + let mut port = addr_from_env[1].to_string(); + + // if the env var value fits the pattern $VAR_NAME:$VAR_NAME + // fetch the value from the specified env vars + if hostname.starts_with('$') { + let var_hostname = hostname[1..].to_string(); + hostname = get_from_env(&var_hostname); } + if !hostname.starts_with("http") { + hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); + } + + if port.starts_with('$') { + let var_port = port[1..].to_string(); + port = get_from_env(&var_port); + } + format!("{}:{}", hostname, port).parse::().unwrap() } + +/// util fuction to fetch value from an env var fn get_from_env(var_to_fetch: &str) -> String { env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) }