Skip to content

Commit 064d749

Browse files
committed
updates
1 parent 8c4f6fc commit 064d749

22 files changed

+207
-259
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta";
5959

6060
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
6161

62-
pub async fn sync_cache_with_ingestors(
63-
url: &str,
64-
ingestor: IngestorMetadata,
65-
body: bool,
66-
) -> Result<(), StreamError> {
67-
if !utils::check_liveness(&ingestor.domain_name).await {
68-
return Ok(());
69-
}
70-
let request_body: Bytes = Bytes::from(body.to_string());
71-
let client = reqwest::Client::new();
72-
let resp = client
73-
.put(url)
74-
.header(header::CONTENT_TYPE, "application/json")
75-
.header(header::AUTHORIZATION, ingestor.token)
76-
.body(request_body)
77-
.send()
78-
.await
79-
.map_err(|err| {
80-
// log the error and return a custom error
81-
log::error!(
82-
"Fatal: failed to set cache: {}\n Error: {:?}",
83-
ingestor.domain_name,
84-
err
85-
);
86-
StreamError::Network(err)
87-
})?;
88-
89-
// if the response is not successful, log the error and return a custom error
90-
// this could be a bit too much, but we need to be sure it covers all cases
91-
if !resp.status().is_success() {
92-
log::error!(
93-
"failed to set cache: {}\nResponse Returned: {:?}",
94-
ingestor.domain_name,
95-
resp.text().await
96-
);
97-
}
98-
99-
Ok(())
100-
}
101-
10262
// forward the create/update stream request to all ingestors to keep them in sync
10363
pub async fn sync_streams_with_ingestors(
10464
headers: HeaderMap,
@@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors(
12282
continue;
12383
}
12484
let url = format!(
125-
"{}{}/logstream/{}",
85+
"{}{}/logstream/{}/sync",
12686
ingestor.domain_name,
12787
base_path_without_preceding_slash(),
12888
stream_name
@@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors(
176136
continue;
177137
}
178138
let url = format!(
179-
"{}{}/user/{}/role",
139+
"{}{}/user/{}/role/sync",
180140
ingestor.domain_name,
181141
base_path_without_preceding_slash(),
182142
username
@@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
224184
continue;
225185
}
226186
let url = format!(
227-
"{}{}/user/{}",
187+
"{}{}/user/{}/sync",
228188
ingestor.domain_name,
229189
base_path_without_preceding_slash(),
230190
username
@@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors(
285245
continue;
286246
}
287247
let url = format!(
288-
"{}{}/user/{}",
248+
"{}{}/user/{}/sync",
289249
ingestor.domain_name,
290250
base_path_without_preceding_slash(),
291251
username
@@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
333293
continue;
334294
}
335295
let url = format!(
336-
"{}{}/user/{}/generate-new-password",
296+
"{}{}/user/{}/generate-new-password/sync",
337297
ingestor.domain_name,
338298
base_path_without_preceding_slash(),
339299
username
@@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors(
389349
continue;
390350
}
391351
let url = format!(
392-
"{}{}/role/{}",
352+
"{}{}/role/{}/sync",
393353
ingestor.domain_name,
394354
base_path_without_preceding_slash(),
395355
name

server/src/handlers/http/ingest.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,15 @@
1818

1919
use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
21+
use super::otel;
2122
use super::users::dashboards::DashboardError;
2223
use super::users::filters::FiltersError;
23-
use super::otel;
2424
use crate::event::{
2525
self,
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29-
use crate::handlers::{
30-
LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY,
31-
};
29+
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3230
use crate::localcache::CacheError;
3331
use crate::metadata::error::stream_info::MetadataError;
3432
use crate::metadata::{self, STREAM_INFO};
@@ -141,7 +139,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
141139
Ok(HttpResponse::Ok().finish())
142140
}
143141

144-
145142
// Handler for POST /api/v1/logstream/{logstream}
146143
// only ingests events into the specified logstream
147144
// fails if the logstream does not exist

server/src/handlers/http/logstream.rs

Lines changed: 12 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,15 @@
1919
use self::error::{CreateStreamError, StreamError};
2020
use super::base_path_without_preceding_slash;
2121
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
22-
use super::cluster::{
23-
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
24-
INTERNAL_STREAM_NAME,
25-
};
22+
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2623
use super::ingest::create_stream_if_not_exists;
2724
use super::modal::utils::logstream_utils::create_update_stream;
2825
use crate::alerts::Alerts;
2926
use crate::handlers::STREAM_TYPE_KEY;
30-
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
27+
use crate::hottier::HotTierManager;
3128
use crate::metadata::STREAM_INFO;
3229
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
33-
use crate::option::{Mode, CONFIG};
30+
use crate::option::CONFIG;
3431
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3532
use crate::storage::StreamType;
3633
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
@@ -47,7 +44,6 @@ use arrow_schema::{Field, Schema};
4744
use bytes::Bytes;
4845
use chrono::Utc;
4946
use http::{HeaderName, HeaderValue};
50-
use itertools::Itertools;
5147
use serde_json::Value;
5248
use std::collections::HashMap;
5349
use std::fs;
@@ -59,7 +55,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
5955
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
6056
return Err(StreamError::StreamNotFound(stream_name));
6157
}
62-
58+
6359
let objectstore = CONFIG.storage().get_object_store();
6460

6561
objectstore.delete_stream(&stream_name).await?;
@@ -85,7 +81,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8581

8682
for ingestor in ingestor_metadata {
8783
let url = format!(
88-
"{}{}/logstream/{}",
84+
"{}{}/logstream/{}/sync",
8985
ingestor.domain_name,
9086
base_path_without_preceding_slash(),
9187
stream_name
@@ -290,13 +286,8 @@ pub async fn put_retention(
290286
pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
291287
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
292288

293-
match CONFIG.parseable.mode {
294-
Mode::Ingest | Mode::All => {
295-
if CONFIG.parseable.local_cache_path.is_none() {
296-
return Err(StreamError::CacheNotEnabled(stream_name));
297-
}
298-
}
299-
_ => {}
289+
if CONFIG.parseable.local_cache_path.is_none() {
290+
return Err(StreamError::CacheNotEnabled(stream_name));
300291
}
301292

302293
let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
@@ -310,61 +301,11 @@ pub async fn put_enable_cache(
310301
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
311302
let storage = CONFIG.storage().get_object_store();
312303

313-
match CONFIG.parseable.mode {
314-
Mode::Query => {
315-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
316-
return Err(StreamError::StreamNotFound(stream_name));
317-
}
318-
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
319-
log::error!("Fatal: failed to get ingestor info: {:?}", err);
320-
StreamError::from(err)
321-
})?;
322-
for ingestor in ingestor_metadata {
323-
let url = format!(
324-
"{}{}/logstream/{}/cache",
325-
ingestor.domain_name,
326-
base_path_without_preceding_slash(),
327-
stream_name
328-
);
329-
330-
super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
331-
}
332-
}
333-
Mode::Ingest => {
334-
if CONFIG.parseable.local_cache_path.is_none() {
335-
return Err(StreamError::CacheNotEnabled(stream_name));
336-
}
337-
// here the ingest server has not found the stream
338-
// so it should check if the stream exists in storage
339-
let check = storage
340-
.list_streams()
341-
.await?
342-
.iter()
343-
.map(|stream| stream.name.clone())
344-
.contains(&stream_name);
345-
346-
if !check {
347-
log::error!("Stream {} not found", stream_name.clone());
348-
return Err(StreamError::StreamNotFound(stream_name.clone()));
349-
}
350-
metadata::STREAM_INFO
351-
.upsert_stream_info(
352-
&*storage,
353-
LogStream {
354-
name: stream_name.clone().to_owned(),
355-
},
356-
)
357-
.await
358-
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
359-
}
360-
Mode::All => {
361-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
362-
return Err(StreamError::StreamNotFound(stream_name));
363-
}
364-
if CONFIG.parseable.local_cache_path.is_none() {
365-
return Err(StreamError::CacheNotEnabled(stream_name));
366-
}
367-
}
304+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
305+
return Err(StreamError::StreamNotFound(stream_name));
306+
}
307+
if CONFIG.parseable.local_cache_path.is_none() {
308+
return Err(StreamError::CacheNotEnabled(stream_name));
368309
}
369310
let enable_cache = body.into_inner();
370311
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
@@ -614,34 +555,6 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
614555
Ok((web::Json(stream_info), StatusCode::OK))
615556
}
616557

617-
pub async fn put_stream_hot_tier(
618-
_req: HttpRequest,
619-
_body: web::Json<serde_json::Value>,
620-
) -> Result<(), StreamError> {
621-
Err(StreamError::Custom {
622-
msg: "Hot tier can only be enabled in query mode".to_string(),
623-
status: StatusCode::BAD_REQUEST,
624-
})
625-
}
626-
627-
pub async fn get_stream_hot_tier(
628-
_req: HttpRequest
629-
) -> Result<(), StreamError> {
630-
Err(StreamError::Custom {
631-
msg: "Hot tier can only be enabled in query mode".to_string(),
632-
status: StatusCode::BAD_REQUEST,
633-
})
634-
}
635-
636-
pub async fn delete_stream_hot_tier(
637-
_req: HttpRequest
638-
) -> Result<(), StreamError> {
639-
Err(StreamError::Custom {
640-
msg: "Hot tier can only be enabled in query mode".to_string(),
641-
status: StatusCode::BAD_REQUEST,
642-
})
643-
}
644-
645558
pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
646559
if let Ok(stream_exists) =
647560
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await

server/src/handlers/http/modal/ingest/ingester_logstream.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,16 @@ use bytes::Bytes;
33
use http::StatusCode;
44
use itertools::Itertools;
55

6-
use crate::{event, handlers::http::{logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream}, metadata::{self, STREAM_INFO}, option::CONFIG, stats, storage::LogStream};
6+
use crate::{
7+
event,
8+
handlers::http::{
9+
logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream,
10+
},
11+
metadata::{self, STREAM_INFO},
12+
option::CONFIG,
13+
stats,
14+
storage::LogStream,
15+
};
716

817
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
918
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -35,7 +44,6 @@ pub async fn put_enable_cache(
3544
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
3645
let storage = CONFIG.storage().get_object_store();
3746

38-
3947
if CONFIG.parseable.local_cache_path.is_none() {
4048
return Err(StreamError::CacheNotEnabled(stream_name));
4149
}
@@ -61,7 +69,7 @@ pub async fn put_enable_cache(
6169
)
6270
.await
6371
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
64-
72+
6573
let enable_cache = body.into_inner();
6674
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
6775
stream_metadata.cache_enabled = enable_cache;
@@ -85,4 +93,4 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea
8593

8694
let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
8795
Ok((web::Json(cache_enabled), StatusCode::OK))
88-
}
96+
}

server/src/handlers/http/modal/ingest/ingester_rbac.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ use std::collections::HashSet;
33
use actix_web::{web, Responder};
44
use tokio::sync::Mutex;
55

6-
use crate::{handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError}, rbac::{user::{self, User as ParseableUser}, Users}, storage};
7-
6+
use crate::{
7+
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
8+
rbac::{
9+
user::{self, User as ParseableUser},
10+
Users,
11+
},
12+
storage,
13+
};
814

915
// async aware lock for updating storage metadata and user map atomicically
1016
static UPDATE_LOCK: Mutex<()> = Mutex::const_new(());
@@ -87,7 +93,7 @@ pub async fn post_gen_password(username: web::Path<String>) -> Result<impl Respo
8793
let username = username.into_inner();
8894
let mut new_hash = String::default();
8995
let mut metadata = get_metadata().await?;
90-
96+
9197
let _ = storage::put_staging_metadata(&metadata);
9298
if let Some(user) = metadata
9399
.users
@@ -105,4 +111,4 @@ pub async fn post_gen_password(username: web::Path<String>) -> Result<impl Respo
105111
Users.change_password_hash(&username, &new_hash);
106112

107113
Ok("Updated")
108-
}
114+
}
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use actix_web::{web, HttpResponse, Responder};
22
use bytes::Bytes;
33

4-
use crate::{handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, rbac::{map::mut_roles, role::model::DefaultPrivilege}, storage};
5-
4+
use crate::{
5+
handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError},
6+
rbac::{map::mut_roles, role::model::DefaultPrivilege},
7+
storage,
8+
};
69

710
// Handler for PUT /api/v1/role/{name}
811
// Creates a new role or update existing one
@@ -11,10 +14,9 @@ pub async fn put(name: web::Path<String>, body: Bytes) -> Result<impl Responder,
1114
let privileges = serde_json::from_slice::<Vec<DefaultPrivilege>>(&body)?;
1215
let mut metadata = get_metadata().await?;
1316
metadata.roles.insert(name.clone(), privileges.clone());
14-
17+
1518
let _ = storage::put_staging_metadata(&metadata);
1619
mut_roles().insert(name.clone(), privileges.clone());
1720

18-
1921
Ok(HttpResponse::Ok().finish())
20-
}
22+
}

0 commit comments

Comments
 (0)