Skip to content

Commit 01a4e55

Browse files
review comments incorporated
query server creates internal stream and sync with ingestors on server start no ingestion can happen from ingestor for internal stream
1 parent 275e8d9 commit 01a4e55

File tree

9 files changed

+127
-41
lines changed

9 files changed

+127
-41
lines changed

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3434
const AUTHORIZATION_KEY: &str = "authorization";
3535
const SEPARATOR: char = '^';
3636
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
37+
const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3738
const OIDC_SCOPE: &str = "openid profile email";
3839
const COOKIE_AGE_DAYS: usize = 7;
3940
const SESSION_COOKIE_NAME: &str = "session";

server/src/handlers/http/cluster/mod.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ use crate::handlers::http::cluster::utils::{
2323
};
2424
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
2525
use crate::handlers::http::logstream::error::StreamError;
26+
use crate::handlers::STREAM_TYPE_KEY;
2627
use crate::option::CONFIG;
2728

2829
use crate::metrics::prom_utils::Metrics;
2930
use crate::stats::Stats;
3031
use crate::storage::object_storage::ingestor_metadata_path;
31-
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3232
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
33+
use crate::storage::{StreamType, PARSEABLE_ROOT_DIRECTORY};
3334
use actix_web::http::header;
3435
use actix_web::{HttpRequest, Responder};
3536
use bytes::Bytes;
@@ -145,6 +146,53 @@ pub async fn sync_streams_with_ingestors(
145146
Ok(())
146147
}
147148

149+
/// sync internal streams with all ingestors
150+
pub async fn sync_internal_streams_with_ingestors(stream_name: &str) -> Result<(), StreamError> {
151+
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
152+
log::error!("Fatal: failed to get ingestor info: {:?}", err);
153+
StreamError::Anyhow(err)
154+
})?;
155+
156+
let client = reqwest::Client::new();
157+
for ingestor in ingestor_infos.iter() {
158+
if !utils::check_liveness(&ingestor.domain_name).await {
159+
log::warn!("Ingestor {} is not live", ingestor.domain_name);
160+
continue;
161+
}
162+
let url = format!(
163+
"{}{}/logstream/{}",
164+
ingestor.domain_name,
165+
base_path_without_preceding_slash(),
166+
stream_name
167+
);
168+
let res = client
169+
.put(url)
170+
.header(header::AUTHORIZATION, &ingestor.token)
171+
.header(header::CONTENT_TYPE, "application/json")
172+
.header(STREAM_TYPE_KEY, StreamType::Internal.to_string())
173+
.send()
174+
.await
175+
.map_err(|err| {
176+
log::error!(
177+
"Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
178+
ingestor.domain_name,
179+
err
180+
);
181+
StreamError::Network(err)
182+
})?;
183+
184+
if !res.status().is_success() {
185+
log::error!(
186+
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
187+
ingestor.domain_name,
188+
res.text().await
189+
);
190+
}
191+
}
192+
193+
Ok(())
194+
}
195+
148196
pub async fn fetch_daily_stats_from_ingestors(
149197
stream_name: &str,
150198
date: &str,
@@ -572,7 +620,6 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
572620

573621
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
574622
log::info!("Setting up schedular for cluster metrics ingestion");
575-
576623
let mut scheduler = AsyncScheduler::new();
577624
scheduler
578625
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
@@ -583,11 +630,9 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
583630
if !metrics.is_empty() {
584631
log::info!("Cluster metrics fetched successfully from all ingestors");
585632
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
586-
let stream_name = INTERNAL_STREAM_NAME;
587-
588633
if matches!(
589634
ingest_internal_stream(
590-
stream_name.to_string(),
635+
INTERNAL_STREAM_NAME.to_string(),
591636
bytes::Bytes::from(metrics_bytes),
592637
)
593638
.await,

server/src/handlers/http/ingest.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
*
1717
*/
1818

19-
use super::cluster::INTERNAL_STREAM_NAME;
20-
use super::logstream::error::CreateStreamError;
19+
use super::logstream::error::{CreateStreamError, StreamError};
2120
use super::users::dashboards::DashboardError;
2221
use super::users::filters::FiltersError;
2322
use super::{kinesis, otel};
@@ -58,14 +57,13 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
5857
{
5958
let stream_name = stream_name.to_str().unwrap().to_owned();
6059
let internal_stream_names = STREAM_INFO.list_internal_streams();
61-
62-
if internal_stream_names.contains(&stream_name) || stream_name == INTERNAL_STREAM_NAME {
60+
if internal_stream_names.contains(&stream_name) {
6361
return Err(PostError::Invalid(anyhow::anyhow!(
6462
"The stream {} is reserved for internal use and cannot be ingested into",
6563
stream_name
6664
)));
6765
}
68-
create_stream_if_not_exists(&stream_name, StreamType::UserDefined).await?;
66+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
6967

7068
flatten_and_push_logs(req, body, stream_name).await?;
7169
Ok(HttpResponse::Ok().finish())
@@ -75,7 +73,6 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
7573
}
7674

7775
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
78-
create_stream_if_not_exists(&stream_name, StreamType::Internal).await?;
7976
let size: usize = body.len();
8077
let parsed_timestamp = Utc::now().naive_utc();
8178
let (rb, is_first) = {
@@ -119,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
119116
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
120117
{
121118
let stream_name = stream_name.to_str().unwrap().to_owned();
122-
create_stream_if_not_exists(&stream_name, StreamType::UserDefined).await?;
119+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
123120

124121
//flatten logs
125122
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
@@ -180,7 +177,7 @@ async fn flatten_and_push_logs(
180177
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
181178
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
182179
let internal_stream_names = STREAM_INFO.list_internal_streams();
183-
if internal_stream_names.contains(&stream_name) || stream_name == INTERNAL_STREAM_NAME {
180+
if internal_stream_names.contains(&stream_name) {
184181
return Err(PostError::Invalid(anyhow::anyhow!(
185182
"Stream {} is an internal stream and cannot be ingested into",
186183
stream_name
@@ -419,7 +416,7 @@ fn into_event_batch(
419416
// Check if the stream exists and create a new stream if doesn't exist
420417
pub async fn create_stream_if_not_exists(
421418
stream_name: &str,
422-
stream_type: StreamType,
419+
stream_type: &str,
423420
) -> Result<(), PostError> {
424421
if STREAM_INFO.stream_exists(stream_name) {
425422
return Ok(());
@@ -494,6 +491,8 @@ pub enum PostError {
494491
DashboardError(#[from] DashboardError),
495492
#[error("Error: {0}")]
496493
CacheError(#[from] CacheError),
494+
#[error("Error: {0}")]
495+
StreamError(#[from] StreamError),
497496
}
498497

499498
impl actix_web::ResponseError for PostError {
@@ -515,6 +514,7 @@ impl actix_web::ResponseError for PostError {
515514
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
516515
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
517516
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
517+
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
518518
}
519519
}
520520

server/src/handlers/http/logstream.rs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ use self::error::{CreateStreamError, StreamError};
2020
use super::base_path_without_preceding_slash;
2121
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
2222
use super::cluster::{
23-
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
23+
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors,
24+
sync_internal_streams_with_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME,
2425
};
26+
use super::ingest::create_stream_if_not_exists;
2527
use crate::alerts::Alerts;
2628
use crate::handlers::{
27-
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
28-
UPDATE_STREAM_KEY,
29+
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
30+
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
2931
};
3032
use crate::hottier::{HotTierManager, StreamHotTier};
3133
use crate::metadata::STREAM_INFO;
@@ -180,6 +182,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
180182
create_update_stream(&req, &body, &stream_name).await?;
181183
sync_streams_with_ingestors(req, body, &stream_name).await?;
182184
} else {
185+
if STREAM_INFO.stream_exists(&stream_name) {
186+
return Ok(("Log stream already exists", StatusCode::OK));
187+
}
183188
create_update_stream(&req, &body, &stream_name).await?;
184189
}
185190

@@ -188,12 +193,13 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
188193

189194
fn fetch_headers_from_put_stream_request(
190195
req: &HttpRequest,
191-
) -> (String, String, String, String, String) {
196+
) -> (String, String, String, String, String, String) {
192197
let mut time_partition = String::default();
193198
let mut time_partition_limit = String::default();
194199
let mut custom_partition = String::default();
195200
let mut static_schema_flag = String::default();
196201
let mut update_stream = String::default();
202+
let mut stream_type = StreamType::UserDefined.to_string();
197203
req.headers().iter().for_each(|(key, value)| {
198204
if key == TIME_PARTITION_KEY {
199205
time_partition = value.to_str().unwrap().to_string();
@@ -210,6 +216,9 @@ fn fetch_headers_from_put_stream_request(
210216
if key == UPDATE_STREAM_KEY {
211217
update_stream = value.to_str().unwrap().to_string();
212218
}
219+
if key == STREAM_TYPE_KEY {
220+
stream_type = value.to_str().unwrap().to_string();
221+
}
213222
});
214223

215224
(
@@ -218,6 +227,7 @@ fn fetch_headers_from_put_stream_request(
218227
custom_partition,
219228
static_schema_flag,
220229
update_stream,
230+
stream_type,
221231
)
222232
}
223233

@@ -305,8 +315,14 @@ async fn create_update_stream(
305315
body: &Bytes,
306316
stream_name: &str,
307317
) -> Result<(), StreamError> {
308-
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
309-
fetch_headers_from_put_stream_request(req);
318+
let (
319+
time_partition,
320+
time_partition_limit,
321+
custom_partition,
322+
static_schema_flag,
323+
update_stream,
324+
stream_type,
325+
) = fetch_headers_from_put_stream_request(req);
310326

311327
if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" {
312328
return Err(StreamError::Custom {
@@ -378,7 +394,7 @@ async fn create_update_stream(
378394
&custom_partition,
379395
&static_schema_flag,
380396
schema,
381-
StreamType::UserDefined,
397+
&stream_type,
382398
)
383399
.await?;
384400

@@ -647,7 +663,9 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
647663
let stats = stats::get_current_stats(&stream_name, "json")
648664
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
649665

650-
let ingestor_stats = if CONFIG.parseable.mode == Mode::Query {
666+
let ingestor_stats = if CONFIG.parseable.mode == Mode::Query
667+
&& STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::UserDefined.to_string()
668+
{
651669
Some(fetch_stats_from_ingestors(&stream_name).await?)
652670
} else {
653671
None
@@ -836,11 +854,11 @@ pub async fn create_stream(
836854
custom_partition: &str,
837855
static_schema_flag: &str,
838856
schema: Arc<Schema>,
839-
stream_type: StreamType,
857+
stream_type: &str,
840858
) -> Result<(), CreateStreamError> {
841859
// fail to proceed if invalid stream name
842-
if stream_type != StreamType::Internal {
843-
validator::stream_name(&stream_name, &stream_type)?;
860+
if stream_type != StreamType::Internal.to_string() {
861+
validator::stream_name(&stream_name, stream_type)?;
844862
}
845863
// Proceed to create log stream if it doesn't exist
846864
let storage = CONFIG.storage().get_object_store();
@@ -876,6 +894,7 @@ pub async fn create_stream(
876894
custom_partition.to_string(),
877895
static_schema_flag.to_string(),
878896
static_schema,
897+
stream_type,
879898
);
880899
}
881900
Err(err) => {
@@ -1041,6 +1060,16 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
10411060
StatusCode::OK,
10421061
))
10431062
}
1063+
1064+
pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
1065+
if create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string())
1066+
.await
1067+
.is_ok()
1068+
{
1069+
sync_internal_streams_with_ingestors(INTERNAL_STREAM_NAME).await?;
1070+
}
1071+
Ok(())
1072+
}
10441073
#[allow(unused)]
10451074
fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
10461075
match kind {

server/src/handlers/http/modal/query_server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::handlers::airplane;
2020
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
21+
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
2122
use crate::handlers::http::middleware::RouteExt;
2223
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
2324
use crate::hottier::HotTierManager;
@@ -174,6 +175,9 @@ impl QueryServer {
174175

175176
migration::run_migration(&CONFIG).await?;
176177

178+
//create internal stream at server start
179+
create_internal_stream_if_not_exists().await?;
180+
177181
FILTERS.load().await?;
178182
DASHBOARDS.load().await?;
179183
// track all parquet files already in the data directory

0 commit comments

Comments
 (0)