Skip to content

Commit 273d022

Browse files
1. removed method created for syncing internal stream
2. modified sync_streams_with_ingestors that can be used for all streams
1 parent 01a4e55 commit 273d022

File tree

2 files changed

+31
-66
lines changed

2 files changed

+31
-66
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,18 @@ use crate::handlers::http::cluster::utils::{
2323
};
2424
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
2525
use crate::handlers::http::logstream::error::StreamError;
26-
use crate::handlers::STREAM_TYPE_KEY;
2726
use crate::option::CONFIG;
2827

2928
use crate::metrics::prom_utils::Metrics;
3029
use crate::stats::Stats;
3130
use crate::storage::object_storage::ingestor_metadata_path;
31+
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3232
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
33-
use crate::storage::{StreamType, PARSEABLE_ROOT_DIRECTORY};
34-
use actix_web::http::header;
33+
use actix_web::http::header::{self, HeaderMap};
3534
use actix_web::{HttpRequest, Responder};
3635
use bytes::Bytes;
3736
use chrono::Utc;
38-
use http::StatusCode;
37+
use http::{header as http_header, StatusCode};
3938
use itertools::Itertools;
4039
use relative_path::RelativePathBuf;
4140
use serde::de::Error;
@@ -97,57 +96,15 @@ pub async fn sync_cache_with_ingestors(
9796

9897
// forward the request to all ingestors to keep them in sync
9998
pub async fn sync_streams_with_ingestors(
100-
req: HttpRequest,
99+
headers: HeaderMap,
101100
body: Bytes,
102101
stream_name: &str,
103102
) -> Result<(), StreamError> {
104-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
105-
log::error!("Fatal: failed to get ingestor info: {:?}", err);
106-
StreamError::Anyhow(err)
107-
})?;
103+
let mut reqwest_headers = http_header::HeaderMap::new();
108104

109-
let client = reqwest::Client::new();
110-
for ingestor in ingestor_infos.iter() {
111-
if !utils::check_liveness(&ingestor.domain_name).await {
112-
log::warn!("Ingestor {} is not live", ingestor.domain_name);
113-
continue;
114-
}
115-
let url = format!(
116-
"{}{}/logstream/{}",
117-
ingestor.domain_name,
118-
base_path_without_preceding_slash(),
119-
stream_name
120-
);
121-
let res = client
122-
.put(url)
123-
.headers(req.headers().into())
124-
.header(header::AUTHORIZATION, &ingestor.token)
125-
.body(body.clone())
126-
.send()
127-
.await
128-
.map_err(|err| {
129-
log::error!(
130-
"Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
131-
ingestor.domain_name,
132-
err
133-
);
134-
StreamError::Network(err)
135-
})?;
136-
137-
if !res.status().is_success() {
138-
log::error!(
139-
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
140-
ingestor.domain_name,
141-
res
142-
);
143-
}
105+
for (key, value) in headers.iter() {
106+
reqwest_headers.insert(key.clone(), value.clone());
144107
}
145-
146-
Ok(())
147-
}
148-
149-
/// sync internal streams with all ingestors
150-
pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<(), StreamError> {
151108
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
152109
log::error!("Fatal: failed to get ingestor info: {:?}", err);
153110
StreamError::Anyhow(err)
@@ -167,9 +124,9 @@ pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<(
167124
);
168125
let res = client
169126
.put(url)
127+
.headers(reqwest_headers.clone())
170128
.header(header::AUTHORIZATION, &ingestor.token)
171-
.header(header::CONTENT_TYPE, "application/json")
172-
.header(STREAM_TYPE_KEY, StreamType::Internal.to_string())
129+
.body(body.clone())
173130
.send()
174131
.await
175132
.map_err(|err| {
@@ -185,7 +142,7 @@ pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<(
185142
log::error!(
186143
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
187144
ingestor.domain_name,
188-
res.text().await
145+
res
189146
);
190147
}
191148
}

server/src/handlers/http/logstream.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use self::error::{CreateStreamError, StreamError};
2020
use super::base_path_without_preceding_slash;
2121
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
2222
use super::cluster::{
23-
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors,
24-
sync_internal_streams_with_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME,
23+
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
24+
INTERNAL_STREAM_NAME,
2525
};
2626
use super::ingest::create_stream_if_not_exists;
2727
use crate::alerts::Alerts;
@@ -44,16 +44,19 @@ use crate::{
4444
};
4545

4646
use crate::{metadata, validator};
47+
use actix_web::http::header::{self, HeaderMap};
4748
use actix_web::http::StatusCode;
4849
use actix_web::{web, HttpRequest, Responder};
4950
use arrow_schema::{Field, Schema};
5051
use bytes::Bytes;
5152
use chrono::Utc;
53+
use http::{HeaderName, HeaderValue};
5254
use itertools::Itertools;
5355
use serde_json::Value;
5456
use std::collections::HashMap;
5557
use std::fs;
5658
use std::num::NonZeroU32;
59+
use std::str::FromStr;
5760
use std::sync::Arc;
5861

5962
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
@@ -179,12 +182,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
179182
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
180183

181184
if CONFIG.parseable.mode == Mode::Query {
182-
create_update_stream(&req, &body, &stream_name).await?;
183-
sync_streams_with_ingestors(req, body, &stream_name).await?;
185+
let headers = create_update_stream(&req, &body, &stream_name).await?;
186+
sync_streams_with_ingestors(headers, body, &stream_name).await?;
184187
} else {
185-
if STREAM_INFO.stream_exists(&stream_name) {
186-
return Ok(("Log stream already exists", StatusCode::OK));
187-
}
188188
create_update_stream(&req, &body, &stream_name).await?;
189189
}
190190

@@ -314,7 +314,7 @@ async fn create_update_stream(
314314
req: &HttpRequest,
315315
body: &Bytes,
316316
stream_name: &str,
317-
) -> Result<(), StreamError> {
317+
) -> Result<HeaderMap, StreamError> {
318318
let (
319319
time_partition,
320320
time_partition_limit,
@@ -355,17 +355,16 @@ async fn create_update_stream(
355355
let time_partition_days = validate_time_partition_limit(&time_partition_limit)?;
356356
update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days)
357357
.await?;
358-
return Ok(());
358+
return Ok(req.headers().clone());
359359
}
360360

361361
if !custom_partition.is_empty() {
362362
validate_custom_partition(&custom_partition)?;
363363
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?;
364-
return Ok(());
365364
} else {
366365
update_custom_partition_in_stream(stream_name.to_string(), "").await?;
367-
return Ok(());
368366
}
367+
return Ok(req.headers().clone());
369368
}
370369
let mut time_partition_in_days = "";
371370
if !time_partition_limit.is_empty() {
@@ -398,7 +397,7 @@ async fn create_update_stream(
398397
)
399398
.await?;
400399

401-
Ok(())
400+
Ok(req.headers().clone())
402401
}
403402
pub async fn put_alert(
404403
req: HttpRequest,
@@ -1066,7 +1065,16 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
10661065
.await
10671066
.is_ok()
10681067
{
1069-
sync_internal_streams_with_ingestors(INTERNAL_STREAM_NAME).await?;
1068+
let mut header_map = HeaderMap::new();
1069+
header_map.insert(
1070+
HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),
1071+
HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(),
1072+
);
1073+
header_map.insert(
1074+
header::CONTENT_TYPE,
1075+
HeaderValue::from_static("application/json"),
1076+
);
1077+
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
10701078
}
10711079
Ok(())
10721080
}

0 commit comments

Comments
 (0)