Skip to content

Commit c091026

Browse files
feat: allow stream creation from ingestor in distributed deployments
Co-authored-by: Akshat Agarwal <[email protected]>
1 parent d361b69 commit c091026

File tree

7 files changed

+134
-6
lines changed

7 files changed

+134
-6
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics;
3030
use crate::rbac::role::model::DefaultPrivilege;
3131
use crate::rbac::user::User;
3232
use crate::stats::Stats;
33+
use crate::storage::get_staging_metadata;
3334
use crate::storage::object_storage::ingestor_metadata_path;
3435
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3536
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
@@ -841,3 +842,61 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
841842

842843
Ok(())
843844
}
845+
846+
pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
847+
let client = reqwest::Client::new();
848+
849+
let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
850+
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
851+
})?;
852+
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
853+
let token = staging_metadata.querier_auth_token.unwrap();
854+
855+
if !check_liveness(&querier_endpoint).await {
856+
log::warn!("Querier {} is not live", querier_endpoint);
857+
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
858+
}
859+
860+
let url = format!(
861+
"{}{}/logstream/{}",
862+
querier_endpoint,
863+
base_path_without_preceding_slash(),
864+
stream_name
865+
);
866+
867+
let response = client
868+
.put(&url)
869+
.header(header::AUTHORIZATION, &token)
870+
.send()
871+
.await
872+
.map_err(|err| {
873+
log::error!(
874+
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
875+
&url,
876+
err
877+
);
878+
StreamError::Network(err)
879+
})?;
880+
881+
let status = response.status();
882+
883+
if !status.is_success() {
884+
let response_text = response.text().await.map_err(|err| {
885+
log::error!("Failed to read response text from querier: {}", &url);
886+
StreamError::Network(err)
887+
})?;
888+
889+
log::error!(
890+
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
891+
&url,
892+
response_text
893+
);
894+
895+
return Err(StreamError::Anyhow(anyhow::anyhow!(
896+
"Request failed with status: {}",
897+
status,
898+
)));
899+
}
900+
901+
Ok(())
902+
}

server/src/handlers/http/ingest.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::event::{
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29+
use crate::handlers::http::cluster::forward_create_stream_request;
2930
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3031
use crate::localcache::CacheError;
3132
use crate::metadata::error::stream_info::MetadataError;
@@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists(
210211
if !streams.contains(&LogStream {
211212
name: stream_name.to_owned(),
212213
}) {
213-
log::error!("Stream {} not found", stream_name);
214-
return Err(PostError::Invalid(anyhow::anyhow!(
215-
"Stream `{}` not found. Please create it using the Query server.",
216-
stream_name
217-
)));
214+
match forward_create_stream_request(stream_name).await {
215+
Ok(()) => log::info!("Stream {} created", stream_name),
216+
Err(e) => {
217+
return Err(PostError::Invalid(anyhow::anyhow!(
218+
"Unable to create stream: {} using query server. Error: {}",
219+
stream_name,
220+
e.to_string(),
221+
)))
222+
}
223+
};
218224
}
219225
metadata::STREAM_INFO
220226
.upsert_stream_info(

server/src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ use actix_web::{web, HttpRequest, Responder};
44
use bytes::Bytes;
55
use chrono::Utc;
66
use http::StatusCode;
7+
use tokio::sync::Mutex;
8+
9+
static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
710

811
use crate::{
912
event,
@@ -77,6 +80,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
7780
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
7881
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
7982

83+
let _ = CREATE_STREAM_LOCK.lock().await;
8084
let headers = create_update_stream(&req, &body, &stream_name).await?;
8185
sync_streams_with_ingestors(headers, body, &stream_name).await?;
8286

server/src/migration.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ pub async fn run_metadata_migration(
7878
let metadata = metadata_migration::v3_v4(storage_metadata);
7979
put_remote_metadata(&*object_store, &metadata).await?;
8080
}
81+
Some("v4") => {
82+
let metadata = metadata_migration::v4_v5(storage_metadata);
83+
put_remote_metadata(&*object_store, &metadata).await?;
84+
}
8185
_ => (),
8286
}
8387
}

server/src/migration/metadata_migration.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*
1717
*/
1818

19+
use base64::Engine;
1920
use rand::distributions::DistString;
2021
use serde_json::{Map, Value as JsonValue};
2122

@@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
148149
storage_metadata
149150
}
150151

152+
// maybe rename
153+
pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {
154+
let metadata = storage_metadata.as_object_mut().unwrap();
155+
metadata.remove_entry("version");
156+
metadata.insert("version".to_string(), JsonValue::String("v5".to_string()));
157+
158+
match metadata.get("server_mode") {
159+
None => {
160+
metadata.insert(
161+
"server_mode".to_string(),
162+
JsonValue::String(CONFIG.parseable.mode.to_string()),
163+
);
164+
}
165+
Some(JsonValue::String(mode)) => match mode.as_str() {
166+
"Query" => {
167+
metadata.insert(
168+
"querier_endpoint".to_string(),
169+
JsonValue::String(CONFIG.parseable.address.clone()),
170+
);
171+
}
172+
"All" => {
173+
metadata.insert(
174+
"server_mode".to_string(),
175+
JsonValue::String(CONFIG.parseable.mode.to_string()),
176+
);
177+
metadata.insert(
178+
"querier_endpoint".to_string(),
179+
JsonValue::String(CONFIG.parseable.address.clone()),
180+
);
181+
}
182+
_ => (),
183+
},
184+
_ => (),
185+
}
186+
187+
metadata.insert(
188+
"querier_auth_token".to_string(),
189+
JsonValue::String(format!(
190+
"Basic {}",
191+
base64::prelude::BASE64_STANDARD.encode(format!(
192+
"{}:{}",
193+
CONFIG.parseable.username, CONFIG.parseable.password
194+
))
195+
)),
196+
);
197+
198+
storage_metadata
199+
}
200+
151201
pub async fn migrate_ingester_metadata() -> anyhow::Result<Option<IngestorMetadata>> {
152202
let imp = ingestor_metadata_path(None);
153203
let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await {

server/src/storage.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ pub use localfs::FSConfig;
4040
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
4141
pub use s3::S3Config;
4242
pub use store_metadata::{
43-
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
43+
get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata,
44+
StorageMetadata,
4445
};
4546

4647
// metadata file names in a Stream prefix

server/src/storage/store_metadata.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ pub struct StorageMetadata {
6363
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
6464
#[serde(default)]
6565
pub default_role: Option<String>,
66+
pub querier_endpoint: Option<String>,
67+
pub querier_auth_token: Option<String>,
6668
}
6769

6870
impl StorageMetadata {
@@ -78,6 +80,8 @@ impl StorageMetadata {
7880
streams: Vec::new(),
7981
roles: HashMap::default(),
8082
default_role: None,
83+
querier_endpoint: None,
84+
querier_auth_token: None,
8185
}
8286
}
8387

0 commit comments

Comments
 (0)