Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/src/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn print_about(
eprint!(
"
{}
Version:\t\t\t\t\t\"v{}\"",
Version: \"v{}\"",
"About:".to_string().bold(),
current_version,
); // " " " "
Expand All @@ -103,8 +103,8 @@ pub fn print_about(

eprintln!(
"
Commit:\t\t\t\t\t\t\"{commit_hash}\"
Docs:\t\t\t\t\t\t\"https://logg.ing/docs\""
Commit: \"{commit_hash}\"
Docs: \"https://logg.ing/docs\""
);
}

Expand Down
40 changes: 20 additions & 20 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub struct Report {
server_mode: String,
version: String,
commit_hash: String,
active_ingesters: u64,
inactive_ingesters: u64,
active_ingestors: u64,
inactive_ingestors: u64,
stream_count: usize,
total_events_count: u64,
total_json_bytes: u64,
Expand All @@ -91,7 +91,7 @@ impl Report {
cpu_count = info.cpus().len();
mem_total = info.total_memory();
}
let ingester_metrics = fetch_ingesters_metrics().await;
let ingestor_metrics = fetch_ingestors_metrics().await;

Self {
deployment_id: storage::StorageMetadata::global().deployment_id,
Expand All @@ -106,12 +106,12 @@ impl Report {
server_mode: CONFIG.parseable.mode.to_string(),
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingesters: ingester_metrics.0,
inactive_ingesters: ingester_metrics.1,
stream_count: ingester_metrics.2,
total_events_count: ingester_metrics.3,
total_json_bytes: ingester_metrics.4,
total_parquet_bytes: ingester_metrics.5,
active_ingestors: ingestor_metrics.0,
inactive_ingestors: ingestor_metrics.1,
stream_count: ingestor_metrics.2,
total_events_count: ingestor_metrics.3,
total_json_bytes: ingestor_metrics.4,
total_parquet_bytes: ingestor_metrics.5,
metrics: build_metrics().await,
}
}
Expand All @@ -122,7 +122,7 @@ impl Report {
}
}

/// build the node metrics for the node ingester endpoint
/// build the node metrics for the node ingestor endpoint
pub async fn get_analytics(_: HttpRequest) -> impl Responder {
let json = NodeMetrics::build();
web::Json(json)
Expand All @@ -148,23 +148,23 @@ fn total_event_stats() -> (u64, u64, u64) {
(total_events, total_json_bytes, total_parquet_bytes)
}

async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) {
async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
let event_stats = total_event_stats();
let mut node_metrics =
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);

let mut vec = vec![];
let mut active_ingesters = 0u64;
let mut offline_ingesters = 0u64;
let mut active_ingestors = 0u64;
let mut offline_ingestors = 0u64;
if CONFIG.parseable.mode == Mode::Query {
// send analytics for ingest servers

// ingester infos should be valid here, if not some thing is wrong
let ingester_infos = cluster::get_ingester_info().await.unwrap();
// ingestor infos should be valid here, if not some thing is wrong
let ingestor_infos = cluster::get_ingestor_info().await.unwrap();

for im in ingester_infos {
for im in ingestor_infos {
if !check_liveness(&im.domain_name).await {
offline_ingesters += 1;
offline_ingestors += 1;
continue;
}

Expand All @@ -185,15 +185,15 @@ async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) {

let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
vec.push(data);
active_ingesters += 1;
active_ingestors += 1;
}

node_metrics.accumulate(&mut vec);
}

(
active_ingesters,
offline_ingesters,
active_ingestors,
offline_ingestors,
node_metrics.stream_count,
node_metrics.total_events_count,
node_metrics.total_json_bytes,
Expand Down
18 changes: 9 additions & 9 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
eprintln!(
"
{}
Address:\t\t\t\t\t{}
Credentials:\t\t\t\t\t{}
Server Mode:\t\t\t\t\t\"{}\"
LLM Status:\t\t\t\t\t\"{}\"",
Address: {}
Credentials: {}
Server Mode: \"{}\"
LLM Status: \"{}\"",
"Server:".to_string().bold(),
address,
credentials,
config.parseable.mode.to_str(),
config.get_server_mode_string(),
llm_status
);
}
Expand All @@ -101,8 +101,8 @@ async fn storage_info(config: &Config) {
eprintln!(
"
{}
Storage Mode:\t\t\t\t\t\"{}\"
Staging Path:\t\t\t\t\t\"{}\"",
Storage Mode: \"{}\"
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
Expand All @@ -116,7 +116,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Cache:\t\t\t\t\t\"{}\", (size: {})",
{:8}Cache: \"{}\", (size: {})",
"",
path.display(),
size
Expand All @@ -125,7 +125,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})",
{:8}Store: \"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
Expand Down
6 changes: 3 additions & 3 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub async fn update_snapshot(
let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE);
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
if m.manifest_path.contains(&p) {
ch = true;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn update_snapshot(
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
Expand Down Expand Up @@ -186,7 +186,7 @@ pub async fn update_snapshot(
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
Expand Down
3 changes: 1 addition & 2 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ impl Cli {
.env("P_INGESTOR_URL")
.value_name("URL")
.required(false)
.value_parser(validation::socket_addr)
.help("URL to connect to this specific ingestor. Default is the address of the server.")
)
.arg(
Expand Down Expand Up @@ -371,7 +370,7 @@ impl FromArgMatches for Cli {
self.ingestor_url = m
.get_one::<String>(Self::INGESTOR_URL)
.cloned()
.unwrap_or_else(|| self.address.clone());
.unwrap_or_else(String::default);

self.local_staging_path = m
.get_one::<PathBuf>(Self::STAGING)
Expand Down
8 changes: 4 additions & 4 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde_json::Value;

use crate::option::CONFIG;

use self::{cluster::get_ingester_info, query::Query};
use self::{cluster::get_ingestor_info, query::Query};

pub(crate) mod about;
pub mod cluster;
Expand Down Expand Up @@ -94,10 +94,10 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch

/// unused for now, might need it later
#[allow(unused)]
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingester
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = get_ingester_info().await.unwrap();
let ima = get_ingestor_info().await.unwrap();

for im in ima.iter() {
let uri = format!(
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn about() -> Json<serde_json::Value> {
let current_version = format!("v{}", current_release.released_version);
let commit = current_release.commit_hash;
let deployment_id = meta.deployment_id.to_string();
let mode = CONFIG.parseable.mode.to_str();
let mode = CONFIG.get_server_mode_string();
let staging = if CONFIG.parseable.mode == Mode::Query {
"".to_string()
} else {
Expand Down
Loading