Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 1 addition & 70 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::metrics::prom_utils::Metrics;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::get_staging_metadata;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
Expand Down Expand Up @@ -65,7 +64,6 @@ pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
body: Bytes,
stream_name: &str,
skip_ingestor: Option<String>,
) -> Result<(), StreamError> {
let mut reqwest_headers = http_header::HeaderMap::new();

Expand All @@ -79,15 +77,7 @@ pub async fn sync_streams_with_ingestors(

let client = reqwest::Client::new();

let final_ingestor_infos = match skip_ingestor {
None => ingestor_infos,
Some(skip_ingestor) => ingestor_infos
.into_iter()
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
.collect::<Vec<IngestorMetadata>>(),
};

for ingestor in final_ingestor_infos {
for ingestor in ingestor_infos {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
Expand Down Expand Up @@ -852,62 +842,3 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {

Ok(())
}

pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
let client = reqwest::Client::new();

let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
})?;
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
let token = staging_metadata.querier_auth_token.unwrap();

if !check_liveness(&querier_endpoint).await {
log::warn!("Querier {} is not live", querier_endpoint);
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
}

let url = format!(
"{}{}/logstream/{}?skip_ingestors={}",
querier_endpoint,
base_path_without_preceding_slash(),
stream_name,
CONFIG.parseable.ingestor_endpoint,
);

let response = client
.put(&url)
.header(header::AUTHORIZATION, &token)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
&url,
err
);
StreamError::Network(err)
})?;

let status = response.status();

if !status.is_success() {
let response_text = response.text().await.map_err(|err| {
log::error!("Failed to read response text from querier: {}", &url);
StreamError::Network(err)
})?;

log::error!(
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
&url,
response_text
);

return Err(StreamError::Anyhow(anyhow::anyhow!(
"Request failed with status: {}",
status,
)));
}

Ok(())
}
78 changes: 32 additions & 46 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use crate::event::{
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::http::cluster::forward_create_stream_request;
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::{self, STREAM_INFO};
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::storage::{LogStream, ObjectStorageError, StreamType};
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -153,7 +153,17 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
)));
}
if !STREAM_INFO.stream_exists(&stream_name) {
return Err(PostError::StreamNotFound(stream_name));
// For distributed deployments, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if CONFIG.parseable.mode != Mode::All {
match create_stream_and_schema_from_storage(&stream_name).await {
Ok(true) => {}
Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())),
}
} else {
return Err(PostError::StreamNotFound(stream_name.clone()));
}
}

flatten_and_push_logs(req, body, stream_name).await?;
Expand Down Expand Up @@ -190,49 +200,25 @@ pub async fn create_stream_if_not_exists(
stream_exists = true;
return Ok(stream_exists);
}
match &CONFIG.parseable.mode {
Mode::All | Mode::Query => {
super::logstream::create_stream(
stream_name.to_string(),
"",
"",
"",
"",
Arc::new(Schema::empty()),
stream_type,
)
.await?;
}
Mode::Ingest => {
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let store = CONFIG.storage().get_object_store();
let streams = store.list_streams().await?;
if !streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
match forward_create_stream_request(stream_name).await {
Ok(()) => log::info!("Stream {} created", stream_name),
Err(e) => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Unable to create stream: {} using query server. Error: {}",
stream_name,
e.to_string(),
)))
}
};
}
metadata::STREAM_INFO
.upsert_stream_info(
&*store,
LogStream {
name: stream_name.to_owned(),
},
)
.await
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
}

// For distributed deployments, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if CONFIG.parseable.mode != Mode::All {
return Ok(create_stream_and_schema_from_storage(stream_name).await?);
}

super::logstream::create_stream(
stream_name.to_string(),
"",
"",
"",
"",
Arc::new(Schema::empty()),
stream_type,
)
.await?;

Ok(stream_exists)
}

Expand Down
Loading
Loading