Skip to content

Commit b4923db

Browse files
committed
updates
1 parent 8c4f6fc commit b4923db

22 files changed

+284
-307
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta";
5959

6060
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
6161

62-
pub async fn sync_cache_with_ingestors(
63-
url: &str,
64-
ingestor: IngestorMetadata,
65-
body: bool,
66-
) -> Result<(), StreamError> {
67-
if !utils::check_liveness(&ingestor.domain_name).await {
68-
return Ok(());
69-
}
70-
let request_body: Bytes = Bytes::from(body.to_string());
71-
let client = reqwest::Client::new();
72-
let resp = client
73-
.put(url)
74-
.header(header::CONTENT_TYPE, "application/json")
75-
.header(header::AUTHORIZATION, ingestor.token)
76-
.body(request_body)
77-
.send()
78-
.await
79-
.map_err(|err| {
80-
// log the error and return a custom error
81-
log::error!(
82-
"Fatal: failed to set cache: {}\n Error: {:?}",
83-
ingestor.domain_name,
84-
err
85-
);
86-
StreamError::Network(err)
87-
})?;
88-
89-
// if the response is not successful, log the error and return a custom error
90-
// this could be a bit too much, but we need to be sure it covers all cases
91-
if !resp.status().is_success() {
92-
log::error!(
93-
"failed to set cache: {}\nResponse Returned: {:?}",
94-
ingestor.domain_name,
95-
resp.text().await
96-
);
97-
}
98-
99-
Ok(())
100-
}
101-
10262
// forward the create/update stream request to all ingestors to keep them in sync
10363
pub async fn sync_streams_with_ingestors(
10464
headers: HeaderMap,
@@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors(
12282
continue;
12383
}
12484
let url = format!(
125-
"{}{}/logstream/{}",
85+
"{}{}/logstream/{}/sync",
12686
ingestor.domain_name,
12787
base_path_without_preceding_slash(),
12888
stream_name
@@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors(
176136
continue;
177137
}
178138
let url = format!(
179-
"{}{}/user/{}/role",
139+
"{}{}/user/{}/role/sync",
180140
ingestor.domain_name,
181141
base_path_without_preceding_slash(),
182142
username
@@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
224184
continue;
225185
}
226186
let url = format!(
227-
"{}{}/user/{}",
187+
"{}{}/user/{}/sync",
228188
ingestor.domain_name,
229189
base_path_without_preceding_slash(),
230190
username
@@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors(
285245
continue;
286246
}
287247
let url = format!(
288-
"{}{}/user/{}",
248+
"{}{}/user/{}/sync",
289249
ingestor.domain_name,
290250
base_path_without_preceding_slash(),
291251
username
@@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
333293
continue;
334294
}
335295
let url = format!(
336-
"{}{}/user/{}/generate-new-password",
296+
"{}{}/user/{}/generate-new-password/sync",
337297
ingestor.domain_name,
338298
base_path_without_preceding_slash(),
339299
username
@@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors(
389349
continue;
390350
}
391351
let url = format!(
392-
"{}{}/role/{}",
352+
"{}{}/role/{}/sync",
393353
ingestor.domain_name,
394354
base_path_without_preceding_slash(),
395355
name

server/src/handlers/http/ingest.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,15 @@
1818

1919
use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
21+
use super::otel;
2122
use super::users::dashboards::DashboardError;
2223
use super::users::filters::FiltersError;
23-
use super::otel;
2424
use crate::event::{
2525
self,
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29-
use crate::handlers::{
30-
LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY,
31-
};
29+
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3230
use crate::localcache::CacheError;
3331
use crate::metadata::error::stream_info::MetadataError;
3432
use crate::metadata::{self, STREAM_INFO};
@@ -141,7 +139,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
141139
Ok(HttpResponse::Ok().finish())
142140
}
143141

144-
145142
// Handler for POST /api/v1/logstream/{logstream}
146143
// only ingests events into the specified logstream
147144
// fails if the logstream does not exist

server/src/handlers/http/logstream.rs

Lines changed: 12 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,20 @@
1717
*/
1818

1919
use self::error::{CreateStreamError, StreamError};
20-
use super::base_path_without_preceding_slash;
2120
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
22-
use super::cluster::{
23-
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
24-
INTERNAL_STREAM_NAME,
25-
};
21+
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2622
use super::ingest::create_stream_if_not_exists;
2723
use super::modal::utils::logstream_utils::create_update_stream;
2824
use crate::alerts::Alerts;
2925
use crate::handlers::STREAM_TYPE_KEY;
30-
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
26+
use crate::hottier::HotTierManager;
3127
use crate::metadata::STREAM_INFO;
3228
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
33-
use crate::option::{Mode, CONFIG};
29+
use crate::option::CONFIG;
3430
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3531
use crate::storage::StreamType;
3632
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
37-
use crate::{
38-
catalog::{self, remove_manifest_from_snapshot},
39-
event, stats,
40-
};
33+
use crate::{catalog, event, stats};
4134

4235
use crate::{metadata, validator};
4336
use actix_web::http::header::{self, HeaderMap};
@@ -47,7 +40,6 @@ use arrow_schema::{Field, Schema};
4740
use bytes::Bytes;
4841
use chrono::Utc;
4942
use http::{HeaderName, HeaderValue};
50-
use itertools::Itertools;
5143
use serde_json::Value;
5244
use std::collections::HashMap;
5345
use std::fs;
@@ -59,7 +51,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
5951
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
6052
return Err(StreamError::StreamNotFound(stream_name));
6153
}
62-
54+
6355
let objectstore = CONFIG.storage().get_object_store();
6456

6557
objectstore.delete_stream(&stream_name).await?;
@@ -78,23 +70,6 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
7870
}
7971
}
8072

81-
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
82-
log::error!("Fatal: failed to get ingestor info: {:?}", err);
83-
StreamError::from(err)
84-
})?;
85-
86-
for ingestor in ingestor_metadata {
87-
let url = format!(
88-
"{}{}/logstream/{}",
89-
ingestor.domain_name,
90-
base_path_without_preceding_slash(),
91-
stream_name
92-
);
93-
94-
// delete the stream
95-
super::cluster::send_stream_delete_request(&url, ingestor.clone()).await?;
96-
}
97-
9873
metadata::STREAM_INFO.delete_stream(&stream_name);
9974
event::STREAM_WRITERS.delete_stream(&stream_name);
10075
stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| {
@@ -104,28 +79,6 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
10479
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
10580
}
10681

107-
pub async fn retention_cleanup(
108-
req: HttpRequest,
109-
body: Bytes,
110-
) -> Result<impl Responder, StreamError> {
111-
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
112-
let storage = CONFIG.storage().get_object_store();
113-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
114-
log::error!("Stream {} not found", stream_name.clone());
115-
return Err(StreamError::StreamNotFound(stream_name.clone()));
116-
}
117-
let date_list: Vec<String> = serde_json::from_slice(&body).unwrap();
118-
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
119-
let mut first_event_at: Option<String> = None;
120-
if let Err(err) = res {
121-
log::error!("Failed to update manifest list in the snapshot {err:?}")
122-
} else {
123-
first_event_at = res.unwrap();
124-
}
125-
126-
Ok((first_event_at, StatusCode::OK))
127-
}
128-
12982
pub async fn list(_: HttpRequest) -> impl Responder {
13083
let res: Vec<LogStream> = STREAM_INFO
13184
.list_streams()
@@ -290,13 +243,8 @@ pub async fn put_retention(
290243
pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
291244
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
292245

293-
match CONFIG.parseable.mode {
294-
Mode::Ingest | Mode::All => {
295-
if CONFIG.parseable.local_cache_path.is_none() {
296-
return Err(StreamError::CacheNotEnabled(stream_name));
297-
}
298-
}
299-
_ => {}
246+
if CONFIG.parseable.local_cache_path.is_none() {
247+
return Err(StreamError::CacheNotEnabled(stream_name));
300248
}
301249

302250
let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
@@ -310,61 +258,11 @@ pub async fn put_enable_cache(
310258
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
311259
let storage = CONFIG.storage().get_object_store();
312260

313-
match CONFIG.parseable.mode {
314-
Mode::Query => {
315-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
316-
return Err(StreamError::StreamNotFound(stream_name));
317-
}
318-
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
319-
log::error!("Fatal: failed to get ingestor info: {:?}", err);
320-
StreamError::from(err)
321-
})?;
322-
for ingestor in ingestor_metadata {
323-
let url = format!(
324-
"{}{}/logstream/{}/cache",
325-
ingestor.domain_name,
326-
base_path_without_preceding_slash(),
327-
stream_name
328-
);
329-
330-
super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
331-
}
332-
}
333-
Mode::Ingest => {
334-
if CONFIG.parseable.local_cache_path.is_none() {
335-
return Err(StreamError::CacheNotEnabled(stream_name));
336-
}
337-
// here the ingest server has not found the stream
338-
// so it should check if the stream exists in storage
339-
let check = storage
340-
.list_streams()
341-
.await?
342-
.iter()
343-
.map(|stream| stream.name.clone())
344-
.contains(&stream_name);
345-
346-
if !check {
347-
log::error!("Stream {} not found", stream_name.clone());
348-
return Err(StreamError::StreamNotFound(stream_name.clone()));
349-
}
350-
metadata::STREAM_INFO
351-
.upsert_stream_info(
352-
&*storage,
353-
LogStream {
354-
name: stream_name.clone().to_owned(),
355-
},
356-
)
357-
.await
358-
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
359-
}
360-
Mode::All => {
361-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
362-
return Err(StreamError::StreamNotFound(stream_name));
363-
}
364-
if CONFIG.parseable.local_cache_path.is_none() {
365-
return Err(StreamError::CacheNotEnabled(stream_name));
366-
}
367-
}
261+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
262+
return Err(StreamError::StreamNotFound(stream_name));
263+
}
264+
if CONFIG.parseable.local_cache_path.is_none() {
265+
return Err(StreamError::CacheNotEnabled(stream_name));
368266
}
369267
let enable_cache = body.into_inner();
370268
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
@@ -614,34 +512,6 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
614512
Ok((web::Json(stream_info), StatusCode::OK))
615513
}
616514

617-
pub async fn put_stream_hot_tier(
618-
_req: HttpRequest,
619-
_body: web::Json<serde_json::Value>,
620-
) -> Result<(), StreamError> {
621-
Err(StreamError::Custom {
622-
msg: "Hot tier can only be enabled in query mode".to_string(),
623-
status: StatusCode::BAD_REQUEST,
624-
})
625-
}
626-
627-
pub async fn get_stream_hot_tier(
628-
_req: HttpRequest
629-
) -> Result<(), StreamError> {
630-
Err(StreamError::Custom {
631-
msg: "Hot tier can only be enabled in query mode".to_string(),
632-
status: StatusCode::BAD_REQUEST,
633-
})
634-
}
635-
636-
pub async fn delete_stream_hot_tier(
637-
_req: HttpRequest
638-
) -> Result<(), StreamError> {
639-
Err(StreamError::Custom {
640-
msg: "Hot tier can only be enabled in query mode".to_string(),
641-
status: StatusCode::BAD_REQUEST,
642-
})
643-
}
644-
645515
pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
646516
if let Ok(stream_exists) =
647517
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await

0 commit comments

Comments
 (0)