Skip to content

Commit 0d5f1fd

Browse files
multiple fixes on server (#753)
1. fixed banner spacing 2. modified server mode: All to Standalone, Ingest to Distributed (Ingest), Query to Distributed (Query) 3. updated server mode in about API response 4. updated logic for env var P_INGESTOR_URL to use HOSTNAME and PORT from env 5. remove put cache api from querier 6. added put cache api to ingestor 7. renamed ingester to ingestor 8. corrected cache flow for ingestors and standalone 9. removed query, other logstream apis for ingestors
1 parent 32fa2bc commit 0d5f1fd

File tree

25 files changed

+280
-257
lines changed

25 files changed

+280
-257
lines changed

server/src/about.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub fn print_about(
9090
eprint!(
9191
"
9292
{}
93-
Version:\t\t\t\t\t\"v{}\"",
93+
Version: \"v{}\"",
9494
"About:".to_string().bold(),
9595
current_version,
9696
); // " " " "
@@ -103,8 +103,8 @@ pub fn print_about(
103103

104104
eprintln!(
105105
"
106-
Commit:\t\t\t\t\t\t\"{commit_hash}\"
107-
Docs:\t\t\t\t\t\t\"https://logg.ing/docs\""
106+
Commit: \"{commit_hash}\"
107+
Docs: \"https://logg.ing/docs\""
108108
);
109109
}
110110

server/src/analytics.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ pub struct Report {
6464
server_mode: String,
6565
version: String,
6666
commit_hash: String,
67-
active_ingesters: u64,
68-
inactive_ingesters: u64,
67+
active_ingestors: u64,
68+
inactive_ingestors: u64,
6969
stream_count: usize,
7070
total_events_count: u64,
7171
total_json_bytes: u64,
@@ -91,7 +91,7 @@ impl Report {
9191
cpu_count = info.cpus().len();
9292
mem_total = info.total_memory();
9393
}
94-
let ingester_metrics = fetch_ingesters_metrics().await;
94+
let ingestor_metrics = fetch_ingestors_metrics().await;
9595

9696
Self {
9797
deployment_id: storage::StorageMetadata::global().deployment_id,
@@ -106,12 +106,12 @@ impl Report {
106106
server_mode: CONFIG.parseable.mode.to_string(),
107107
version: current().released_version.to_string(),
108108
commit_hash: current().commit_hash,
109-
active_ingesters: ingester_metrics.0,
110-
inactive_ingesters: ingester_metrics.1,
111-
stream_count: ingester_metrics.2,
112-
total_events_count: ingester_metrics.3,
113-
total_json_bytes: ingester_metrics.4,
114-
total_parquet_bytes: ingester_metrics.5,
109+
active_ingestors: ingestor_metrics.0,
110+
inactive_ingestors: ingestor_metrics.1,
111+
stream_count: ingestor_metrics.2,
112+
total_events_count: ingestor_metrics.3,
113+
total_json_bytes: ingestor_metrics.4,
114+
total_parquet_bytes: ingestor_metrics.5,
115115
metrics: build_metrics().await,
116116
}
117117
}
@@ -122,7 +122,7 @@ impl Report {
122122
}
123123
}
124124

125-
/// build the node metrics for the node ingester endpoint
125+
/// build the node metrics for the node ingestor endpoint
126126
pub async fn get_analytics(_: HttpRequest) -> impl Responder {
127127
let json = NodeMetrics::build();
128128
web::Json(json)
@@ -148,23 +148,23 @@ fn total_event_stats() -> (u64, u64, u64) {
148148
(total_events, total_json_bytes, total_parquet_bytes)
149149
}
150150

151-
async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) {
151+
async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
152152
let event_stats = total_event_stats();
153153
let mut node_metrics =
154154
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
155155

156156
let mut vec = vec![];
157-
let mut active_ingesters = 0u64;
158-
let mut offline_ingesters = 0u64;
157+
let mut active_ingestors = 0u64;
158+
let mut offline_ingestors = 0u64;
159159
if CONFIG.parseable.mode == Mode::Query {
160160
// send analytics for ingest servers
161161

162-
// ingester infos should be valid here, if not some thing is wrong
163-
let ingester_infos = cluster::get_ingester_info().await.unwrap();
162+
// ingestor infos should be valid here, if not some thing is wrong
163+
let ingestor_infos = cluster::get_ingestor_info().await.unwrap();
164164

165-
for im in ingester_infos {
165+
for im in ingestor_infos {
166166
if !check_liveness(&im.domain_name).await {
167-
offline_ingesters += 1;
167+
offline_ingestors += 1;
168168
continue;
169169
}
170170

@@ -185,15 +185,15 @@ async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) {
185185

186186
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
187187
vec.push(data);
188-
active_ingesters += 1;
188+
active_ingestors += 1;
189189
}
190190

191191
node_metrics.accumulate(&mut vec);
192192
}
193193

194194
(
195-
active_ingesters,
196-
offline_ingesters,
195+
active_ingestors,
196+
offline_ingestors,
197197
node_metrics.stream_count,
198198
node_metrics.total_events_count,
199199
node_metrics.total_json_bytes,

server/src/banner.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
7777
eprintln!(
7878
"
7979
{}
80-
Address:\t\t\t\t\t{}
81-
Credentials:\t\t\t\t\t{}
82-
Server Mode:\t\t\t\t\t\"{}\"
83-
LLM Status:\t\t\t\t\t\"{}\"",
80+
Address: {}
81+
Credentials: {}
82+
Server Mode: \"{}\"
83+
LLM Status: \"{}\"",
8484
"Server:".to_string().bold(),
8585
address,
8686
credentials,
87-
config.parseable.mode.to_str(),
87+
config.get_server_mode_string(),
8888
llm_status
8989
);
9090
}
@@ -101,8 +101,8 @@ async fn storage_info(config: &Config) {
101101
eprintln!(
102102
"
103103
{}
104-
Storage Mode:\t\t\t\t\t\"{}\"
105-
Staging Path:\t\t\t\t\t\"{}\"",
104+
Storage Mode: \"{}\"
105+
Staging Path: \"{}\"",
106106
"Storage:".to_string().bold(),
107107
config.get_storage_mode_string(),
108108
config.staging_dir().to_string_lossy(),
@@ -116,7 +116,7 @@ async fn storage_info(config: &Config) {
116116

117117
eprintln!(
118118
"\
119-
{:8}Cache:\t\t\t\t\t\"{}\", (size: {})",
119+
{:8}Cache: \"{}\", (size: {})",
120120
"",
121121
path.display(),
122122
size
@@ -125,7 +125,7 @@ async fn storage_info(config: &Config) {
125125

126126
eprintln!(
127127
"\
128-
{:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})",
128+
{:8}Store: \"{}\", (latency: {:?})",
129129
"",
130130
storage.get_endpoint(),
131131
latency

server/src/catalog.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ pub async fn update_snapshot(
118118
let mut ch = false;
119119
for m in manifests.iter() {
120120
let s = get_address();
121-
let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE);
121+
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
122122
if m.manifest_path.contains(&p) {
123123
ch = true;
124124
}
@@ -152,7 +152,7 @@ pub async fn update_snapshot(
152152
};
153153

154154
let addr = get_address();
155-
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
155+
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
156156
let path =
157157
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
158158
storage
@@ -186,7 +186,7 @@ pub async fn update_snapshot(
186186
};
187187

188188
let addr = get_address();
189-
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
189+
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
190190
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
191191
storage
192192
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())

server/src/cli.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,6 @@ impl Cli {
322322
.env("P_INGESTOR_URL")
323323
.value_name("URL")
324324
.required(false)
325-
.value_parser(validation::socket_addr)
326325
.help("URL to connect to this specific ingestor. Default is the address of the server.")
327326
)
328327
.arg(
@@ -371,7 +370,7 @@ impl FromArgMatches for Cli {
371370
self.ingestor_url = m
372371
.get_one::<String>(Self::INGESTOR_URL)
373372
.cloned()
374-
.unwrap_or_else(|| self.address.clone());
373+
.unwrap_or_else(String::default);
375374

376375
self.local_staging_path = m
377376
.get_one::<PathBuf>(Self::STAGING)

server/src/handlers/http.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use serde_json::Value;
2323

2424
use crate::option::CONFIG;
2525

26-
use self::{cluster::get_ingester_info, query::Query};
26+
use self::{cluster::get_ingestor_info, query::Query};
2727

2828
pub(crate) mod about;
2929
pub mod cluster;
@@ -94,10 +94,10 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
9494

9595
/// unused for now, might need it later
9696
#[allow(unused)]
97-
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
98-
// send the query request to the ingester
97+
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
98+
// send the query request to the ingestor
9999
let mut res = vec![];
100-
let ima = get_ingester_info().await.unwrap();
100+
let ima = get_ingestor_info().await.unwrap();
101101

102102
for im in ima.iter() {
103103
let uri = format!(

server/src/handlers/http/about.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub async fn about() -> Json<serde_json::Value> {
6565
let current_version = format!("v{}", current_release.released_version);
6666
let commit = current_release.commit_hash;
6767
let deployment_id = meta.deployment_id.to_string();
68-
let mode = CONFIG.parseable.mode.to_str();
68+
let mode = CONFIG.get_server_mode_string();
6969
let staging = if CONFIG.parseable.mode == Mode::Query {
7070
"".to_string()
7171
} else {

0 commit comments

Comments
 (0)