Skip to content

Commit e99f11b

Browse files
fix: ingestor-querier sync
Below changes are done in this PR - 1. removed querier endpoint and token from parseable.json 2. added migration steps to update parseable.json for latest release 3. removed ingestor to querier sync for ingestion with new stream creation 4. updated logic to list stream from storage 5. updated logic in stream migration at server start 6. updated logic in places where querier/ingestors need to check if stream is created in S3 before confirming the existence of the stream
1 parent af5903d commit e99f11b

File tree

14 files changed

+351
-282
lines changed

14 files changed

+351
-282
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::metrics::prom_utils::Metrics;
3030
use crate::rbac::role::model::DefaultPrivilege;
3131
use crate::rbac::user::User;
3232
use crate::stats::Stats;
33-
use crate::storage::get_staging_metadata;
3433
use crate::storage::object_storage::ingestor_metadata_path;
3534
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3635
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
@@ -65,7 +64,6 @@ pub async fn sync_streams_with_ingestors(
6564
headers: HeaderMap,
6665
body: Bytes,
6766
stream_name: &str,
68-
skip_ingestor: Option<String>,
6967
) -> Result<(), StreamError> {
7068
let mut reqwest_headers = http_header::HeaderMap::new();
7169

@@ -79,15 +77,7 @@ pub async fn sync_streams_with_ingestors(
7977

8078
let client = reqwest::Client::new();
8179

82-
let final_ingestor_infos = match skip_ingestor {
83-
None => ingestor_infos,
84-
Some(skip_ingestor) => ingestor_infos
85-
.into_iter()
86-
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
87-
.collect::<Vec<IngestorMetadata>>(),
88-
};
89-
90-
for ingestor in final_ingestor_infos {
80+
for ingestor in ingestor_infos {
9181
if !utils::check_liveness(&ingestor.domain_name).await {
9282
log::warn!("Ingestor {} is not live", ingestor.domain_name);
9383
continue;
@@ -852,62 +842,3 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
852842

853843
Ok(())
854844
}
855-
856-
pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
857-
let client = reqwest::Client::new();
858-
859-
let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
860-
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
861-
})?;
862-
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
863-
let token = staging_metadata.querier_auth_token.unwrap();
864-
865-
if !check_liveness(&querier_endpoint).await {
866-
log::warn!("Querier {} is not live", querier_endpoint);
867-
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
868-
}
869-
870-
let url = format!(
871-
"{}{}/logstream/{}?skip_ingestors={}",
872-
querier_endpoint,
873-
base_path_without_preceding_slash(),
874-
stream_name,
875-
CONFIG.parseable.ingestor_endpoint,
876-
);
877-
878-
let response = client
879-
.put(&url)
880-
.header(header::AUTHORIZATION, &token)
881-
.send()
882-
.await
883-
.map_err(|err| {
884-
log::error!(
885-
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
886-
&url,
887-
err
888-
);
889-
StreamError::Network(err)
890-
})?;
891-
892-
let status = response.status();
893-
894-
if !status.is_success() {
895-
let response_text = response.text().await.map_err(|err| {
896-
log::error!("Failed to read response text from querier: {}", &url);
897-
StreamError::Network(err)
898-
})?;
899-
900-
log::error!(
901-
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
902-
&url,
903-
response_text
904-
);
905-
906-
return Err(StreamError::Anyhow(anyhow::anyhow!(
907-
"Request failed with status: {}",
908-
status,
909-
)));
910-
}
911-
912-
Ok(())
913-
}

server/src/handlers/http/ingest.rs

Lines changed: 24 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ use crate::event::{
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29-
use crate::handlers::http::cluster::forward_create_stream_request;
29+
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
3030
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3131
use crate::localcache::CacheError;
3232
use crate::metadata::error::stream_info::MetadataError;
33-
use crate::metadata::{self, STREAM_INFO};
33+
use crate::metadata::STREAM_INFO;
3434
use crate::option::{Mode, CONFIG};
3535
use crate::storage::{LogStream, ObjectStorageError, StreamType};
3636
use crate::utils::header_parsing::ParseHeaderError;
@@ -153,7 +153,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
153153
)));
154154
}
155155
if !STREAM_INFO.stream_exists(&stream_name) {
156-
return Err(PostError::StreamNotFound(stream_name));
156+
create_stream_and_schema_from_storage(&stream_name).await?;
157157
}
158158

159159
flatten_and_push_logs(req, body, stream_name).await?;
@@ -190,49 +190,29 @@ pub async fn create_stream_if_not_exists(
190190
stream_exists = true;
191191
return Ok(stream_exists);
192192
}
193-
match &CONFIG.parseable.mode {
194-
Mode::All | Mode::Query => {
195-
super::logstream::create_stream(
196-
stream_name.to_string(),
197-
"",
198-
"",
199-
"",
200-
"",
201-
Arc::new(Schema::empty()),
202-
stream_type,
203-
)
204-
.await?;
205-
}
206-
Mode::Ingest => {
207-
// here the ingest server has not found the stream
208-
// so it should check if the stream exists in storage
209-
let store = CONFIG.storage().get_object_store();
210-
let streams = store.list_streams().await?;
211-
if !streams.contains(&LogStream {
212-
name: stream_name.to_owned(),
213-
}) {
214-
match forward_create_stream_request(stream_name).await {
215-
Ok(()) => log::info!("Stream {} created", stream_name),
216-
Err(e) => {
217-
return Err(PostError::Invalid(anyhow::anyhow!(
218-
"Unable to create stream: {} using query server. Error: {}",
219-
stream_name,
220-
e.to_string(),
221-
)))
222-
}
223-
};
224-
}
225-
metadata::STREAM_INFO
226-
.upsert_stream_info(
227-
&*store,
228-
LogStream {
229-
name: stream_name.to_owned(),
230-
},
231-
)
232-
.await
233-
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
193+
194+
if CONFIG.parseable.mode != Mode::All {
195+
let store = CONFIG.storage().get_object_store();
196+
let streams = store.list_streams().await?;
197+
if streams.contains(&LogStream {
198+
name: stream_name.to_owned(),
199+
}) {
200+
create_stream_and_schema_from_storage(stream_name).await?;
201+
return Ok(stream_exists);
234202
}
235203
}
204+
205+
super::logstream::create_stream(
206+
stream_name.to_string(),
207+
"",
208+
"",
209+
"",
210+
"",
211+
Arc::new(Schema::empty()),
212+
stream_type,
213+
)
214+
.await?;
215+
236216
Ok(stream_exists)
237217
}
238218

server/src/handlers/http/logstream.rs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@ use self::error::{CreateStreamError, StreamError};
2020
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
2121
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2222
use super::ingest::create_stream_if_not_exists;
23-
use super::modal::utils::logstream_utils::create_update_stream;
23+
use super::modal::utils::logstream_utils::{
24+
create_stream_and_schema_from_storage, create_update_stream,
25+
};
2426
use crate::alerts::Alerts;
2527
use crate::event::format::update_data_type_to_datetime;
2628
use crate::handlers::STREAM_TYPE_KEY;
2729
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2830
use crate::metadata::STREAM_INFO;
2931
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
30-
use crate::option::CONFIG;
32+
use crate::option::{Mode, CONFIG};
3133
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3234
use crate::storage::StreamType;
33-
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
35+
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
3436
use crate::{catalog, event, stats};
3537

3638
use crate::{metadata, validator};
@@ -82,11 +84,12 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8284
}
8385

8486
pub async fn list(_: HttpRequest) -> impl Responder {
85-
let res: Vec<LogStream> = STREAM_INFO
87+
let res = CONFIG
88+
.storage()
89+
.get_object_store()
8690
.list_streams()
87-
.into_iter()
88-
.map(|stream| LogStream { name: stream })
89-
.collect();
91+
.await
92+
.unwrap();
9093

9194
web::Json(res)
9295
}
@@ -180,7 +183,11 @@ pub async fn put_alert(
180183
validator::alert(&alerts)?;
181184

182185
if !STREAM_INFO.stream_initialized(&stream_name)? {
183-
return Err(StreamError::UninitializedLogstream);
186+
if CONFIG.parseable.mode == Mode::Query {
187+
create_stream_and_schema_from_storage(&stream_name).await?;
188+
} else {
189+
return Err(StreamError::UninitializedLogstream);
190+
}
184191
}
185192

186193
let schema = STREAM_INFO.schema(&stream_name)?;
@@ -218,7 +225,11 @@ pub async fn put_alert(
218225
pub async fn get_retention(req: HttpRequest) -> Result<impl Responder, StreamError> {
219226
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
220227
if !STREAM_INFO.stream_exists(&stream_name) {
221-
return Err(StreamError::StreamNotFound(stream_name.to_string()));
228+
if CONFIG.parseable.mode == Mode::Query {
229+
create_stream_and_schema_from_storage(&stream_name).await?;
230+
} else {
231+
return Err(StreamError::StreamNotFound(stream_name));
232+
}
222233
}
223234
let retention = STREAM_INFO.get_retention(&stream_name);
224235

@@ -328,7 +339,11 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
328339
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
329340

330341
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
331-
return Err(StreamError::StreamNotFound(stream_name));
342+
if CONFIG.parseable.mode == Mode::Query {
343+
create_stream_and_schema_from_storage(&stream_name).await?;
344+
} else {
345+
return Err(StreamError::StreamNotFound(stream_name));
346+
}
332347
}
333348

334349
let query_string = req.query_string();
@@ -497,7 +512,11 @@ pub async fn create_stream(
497512
pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamError> {
498513
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
499514
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
500-
return Err(StreamError::StreamNotFound(stream_name));
515+
if CONFIG.parseable.mode == Mode::Query {
516+
create_stream_and_schema_from_storage(&stream_name).await?;
517+
} else {
518+
return Err(StreamError::StreamNotFound(stream_name));
519+
}
501520
}
502521

503522
let store = CONFIG.storage().get_object_store();
@@ -540,7 +559,11 @@ pub async fn put_stream_hot_tier(
540559
) -> Result<impl Responder, StreamError> {
541560
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
542561
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
543-
return Err(StreamError::StreamNotFound(stream_name));
562+
if CONFIG.parseable.mode == Mode::Query {
563+
create_stream_and_schema_from_storage(&stream_name).await?;
564+
} else {
565+
return Err(StreamError::StreamNotFound(stream_name));
566+
}
544567
}
545568

546569
if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
@@ -590,7 +613,11 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
590613
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
591614

592615
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
593-
return Err(StreamError::StreamNotFound(stream_name));
616+
if CONFIG.parseable.mode == Mode::Query {
617+
create_stream_and_schema_from_storage(&stream_name).await?;
618+
} else {
619+
return Err(StreamError::StreamNotFound(stream_name));
620+
}
594621
}
595622

596623
if CONFIG.parseable.hot_tier_storage_path.is_none() {
@@ -615,7 +642,11 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
615642
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
616643

617644
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
618-
return Err(StreamError::StreamNotFound(stream_name));
645+
if CONFIG.parseable.mode == Mode::Query {
646+
create_stream_and_schema_from_storage(&stream_name).await?;
647+
} else {
648+
return Err(StreamError::StreamNotFound(stream_name));
649+
}
619650
}
620651

621652
if CONFIG.parseable.hot_tier_storage_path.is_none() {
@@ -654,7 +685,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
654685
header::CONTENT_TYPE,
655686
HeaderValue::from_static("application/json"),
656687
);
657-
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?;
688+
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
658689
}
659690
Ok(())
660691
}

server/src/handlers/http/modal/ingest/ingester_logstream.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use crate::{
77
catalog::remove_manifest_from_snapshot,
88
event,
99
handlers::http::{
10-
logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream,
10+
logstream::error::StreamError,
11+
modal::utils::logstream_utils::{
12+
create_stream_and_schema_from_storage, create_update_stream,
13+
},
1114
},
1215
metadata::{self, STREAM_INFO},
1316
option::CONFIG,
@@ -22,8 +25,7 @@ pub async fn retention_cleanup(
2225
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
2326
let storage = CONFIG.storage().get_object_store();
2427
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
25-
log::error!("Stream {} not found", stream_name.clone());
26-
return Err(StreamError::StreamNotFound(stream_name.clone()));
28+
create_stream_and_schema_from_storage(&stream_name).await?;
2729
}
2830
let date_list: Vec<String> = serde_json::from_slice(&body).unwrap();
2931
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
@@ -40,7 +42,7 @@ pub async fn retention_cleanup(
4042
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4143
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
4244
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
43-
return Err(StreamError::StreamNotFound(stream_name));
45+
create_stream_and_schema_from_storage(&stream_name).await?;
4446
}
4547

4648
metadata::STREAM_INFO.delete_stream(&stream_name);

0 commit comments

Comments
 (0)