From 4390045f246fe49b64e8b6d48f2db72fbed5f118 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 11 Apr 2024 11:53:17 +0530 Subject: [PATCH 1/4] update get_address to use url instead of sockaddr --- server/src/catalog.rs | 21 ++++++++++++++++--- .../src/handlers/http/modal/ingest_server.rs | 15 ++++++++++--- server/src/metrics/prom_utils.rs | 6 +++++- server/src/storage/object_storage.rs | 15 ++++++++----- server/src/storage/staging.rs | 4 ++-- server/src/utils.rs | 11 +++++----- 6 files changed, 52 insertions(+), 20 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 1d0270a88..3498c8183 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -118,7 +118,12 @@ pub async fn update_snapshot( let mut ch = false; for m in manifests.iter() { let s = get_address(); - let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE); + let p = format!( + "{}.{}.{}", + s.domain().unwrap(), + s.port().unwrap_or_default(), + MANIFEST_FILE + ); if m.manifest_path.contains(&p) { ch = true; } @@ -152,7 +157,12 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap_or_default(), + MANIFEST_FILE + ); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage @@ -186,7 +196,12 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap(), + MANIFEST_FILE + ); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 0b565cee1..5309b76a3 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -182,7 +182,10 @@ impl IngestServer { let store = CONFIG.storage().get_object_store(); let sock = get_address(); - let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string()); + let path = ingestor_metadata_path( + sock.domain().unwrap().to_string(), + sock.port().unwrap_or_default().to_string(), + ); if store.get_object(&path).await.is_ok() { println!("ingestor metadata already exists"); @@ -191,13 +194,19 @@ impl IngestServer { let scheme = CONFIG.parseable.get_scheme(); let resource = IngestorMetadata::new( - sock.port().to_string(), + sock.port().unwrap_or_default().to_string(), CONFIG .parseable .domain_address .clone() .unwrap_or_else(|| { - Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap() + Url::parse(&format!( + "{}://{}:{}", + scheme, + sock.domain().unwrap(), + sock.port().unwrap_or_default() + )) + .unwrap() }) .to_string(), DEFAULT_VERSION.to_string(), diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 6fb99405d..37a0445bf 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -23,7 +23,11 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { let socket = get_address(); - let address = format!("http://{}:{}", socket.ip(), socket.port()); + let address = format!( + "http://{}:{}", + socket.domain().unwrap(), + socket.port().unwrap_or_default() + ); Metrics { address, parseable_events_ingested: 0.0, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index c3acec8fa..573b60bb4 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -541,8 +541,8 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { let addr = get_address(); let file_name = format!( ".ingestor.{}.{}{}", - addr.ip(), - addr.port(), + addr.domain().unwrap(), + addr.port().unwrap_or_default(), SCHEMA_FILE_NAME ); @@ -561,8 +561,8 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { let addr = get_address(); let file_name = format!( ".ingestor.{}.{}{}", - addr.ip(), - addr.port(), + addr.domain().unwrap(), + addr.port().unwrap_or_default(), STREAM_METADATA_FILE_NAME ); RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) @@ -589,7 +589,12 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] fn manifest_path(prefix: &str) -> RelativePathBuf { let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap_or_default(), + MANIFEST_FILE + ); RelativePathBuf::from_iter([prefix, &mainfest_file_name]) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index f2f42a901..b3637ce68 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -64,8 +64,8 @@ impl StorageDir { + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); let sock = get_address(); - let ip = sock.ip(); - let port = sock.port(); + let ip = sock.domain().unwrap(); + let port = sock.port().unwrap_or_default(); format!("{local_uri}{ip}.{port}.{extention}") } diff --git a/server/src/utils.rs b/server/src/utils.rs index d1cf6a155..015553b1a 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -25,7 +25,9 @@ pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; use std::env; +#[allow(unused_imports)] use std::net::SocketAddr; +use url::Url; #[allow(dead_code)] pub fn hostname() -> Option { @@ -224,10 +226,9 @@ impl TimePeriod { } } -#[inline(always)] -pub fn get_address() -> SocketAddr { +pub fn get_address() -> Url { if CONFIG.parseable.ingestor_url.is_empty() { - CONFIG.parseable.address.parse::().unwrap() + CONFIG.parseable.address.parse::().unwrap() } else { let addr_from_env = CONFIG .parseable @@ -245,9 +246,7 @@ pub fn get_address() -> SocketAddr { let var_port = port[1..].to_string(); port = get_from_env(&var_port); } - format!("{}:{}", hostname, port) - .parse::() - .unwrap() + format!("{}:{}", hostname, port).parse::().unwrap() } } fn get_from_env(var_to_fetch: &str) -> String { From 8c43ae18081f671766a744bfb1c7a52144c9b2a6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 11 Apr 2024 12:11:58 +0530 Subject: [PATCH 2/4] fix relative url with base --- server/src/utils.rs | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index 015553b1a..316ece654 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -226,29 +226,38 @@ impl TimePeriod { } } +// TODO: CLEAN UP pub fn get_address() -> Url { if CONFIG.parseable.ingestor_url.is_empty() { - CONFIG.parseable.address.parse::().unwrap() - } else { - let addr_from_env = CONFIG - .parseable - .ingestor_url - .split(':') - .collect::>(); - - let mut hostname = addr_from_env[0].to_string(); - let mut port = addr_from_env[1].to_string(); - if hostname.starts_with('$') { - let var_hostname = hostname[1..].to_string(); - hostname = get_from_env(&var_hostname); - } - if port.starts_with('$') { - let var_port = port[1..].to_string(); - port = get_from_env(&var_port); + let url = format!( + "{}://{}", + CONFIG.parseable.get_scheme(), + CONFIG.parseable.address + ); + return url.parse::().unwrap(); + } + let addr_from_env = CONFIG + .parseable + .ingestor_url + .split(':') + .collect::>(); + + let mut hostname = addr_from_env[0].to_string(); + let mut port = addr_from_env[1].to_string(); + if hostname.starts_with('$') { + let var_hostname = hostname[1..].to_string(); + hostname = get_from_env(&var_hostname); + if !hostname.starts_with("http") { + hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); } - format!("{}:{}", hostname, port).parse::().unwrap() } + if port.starts_with('$') { + let var_port = port[1..].to_string(); + port = get_from_env(&var_port); + } + format!("{}:{}", hostname, port).parse::().unwrap() } + fn get_from_env(var_to_fetch: &str) -> String { env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) } From 60189757f90c2110d571b2f179933ff7a962fe0e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 11 Apr 2024 12:49:57 +0530 Subject: [PATCH 3/4] fix to check scheme for hostname --- server/src/utils.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index 316ece654..5dd0bad36 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -247,10 +247,11 @@ pub fn get_address() -> Url { if hostname.starts_with('$') { let var_hostname = hostname[1..].to_string(); hostname = get_from_env(&var_hostname); - if !hostname.starts_with("http") { - hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); - } } + if !hostname.starts_with("http") { + hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); + } + if port.starts_with('$') { let var_port = port[1..].to_string(); port = get_from_env(&var_port); From acd8afbdb1839dcb4c93c5928d59c7cfab984d83 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 15 Apr 2024 13:34:54 +0530 Subject: [PATCH 4/4] rename ingestor_url to ingestor_endpoint --- server/src/cli.rs | 14 +++++++------- server/src/utils.rs | 16 ++++++++++------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 41e03b39f..2ad9899cd 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -88,7 +88,7 @@ pub struct Cli { pub mode: Mode, /// public address for the parseable server ingestor - pub ingestor_url: String, + pub ingestor_endpoint: String, } impl Cli { @@ -115,7 +115,7 @@ impl Cli { pub const ROW_GROUP_SIZE: &'static str = "row-group-size"; pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; pub const MODE: &'static str = "mode"; - pub const INGESTOR_URL: &'static str = "ingestor-url"; + pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; @@ -317,9 +317,9 @@ impl Cli { .help("Mode of operation"), ) .arg( - Arg::new(Self::INGESTOR_URL) - .long(Self::INGESTOR_URL) - .env("P_INGESTOR_URL") + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") .value_name("URL") .required(false) .help("URL to connect to this specific ingestor. Default is the address of the server.") @@ -367,8 +367,8 @@ impl FromArgMatches for Cli { .cloned() .expect("default value for address"); - self.ingestor_url = m - .get_one::(Self::INGESTOR_URL) + self.ingestor_endpoint = m + .get_one::(Self::INGESTOR_ENDPOINT) .cloned() .unwrap_or_else(String::default); diff --git a/server/src/utils.rs b/server/src/utils.rs index 5dd0bad36..987e41262 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -226,24 +226,27 @@ impl TimePeriod { } } -// TODO: CLEAN UP pub fn get_address() -> Url { - if CONFIG.parseable.ingestor_url.is_empty() { - let url = format!( + if CONFIG.parseable.ingestor_endpoint.is_empty() { + return format!( "{}://{}", CONFIG.parseable.get_scheme(), CONFIG.parseable.address - ); - return url.parse::().unwrap(); + ) + .parse::() // if the value was improperly set, this will panic before hand + .unwrap(); } let addr_from_env = CONFIG .parseable - .ingestor_url + .ingestor_endpoint .split(':') .collect::>(); let mut hostname = addr_from_env[0].to_string(); let mut port = addr_from_env[1].to_string(); + + // if the env var value fits the pattern $VAR_NAME:$VAR_NAME + // fetch the value from the specified env vars if hostname.starts_with('$') { let var_hostname = hostname[1..].to_string(); hostname = get_from_env(&var_hostname); @@ -259,6 +262,7 @@ pub fn get_address() -> Url { format!("{}:{}", hostname, port).parse::().unwrap() } +/// util fuction to fetch value from an env var fn get_from_env(var_to_fetch: &str) -> String { env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) }