Skip to content

Commit fb8a105

Browse files
fix: add delete stream API in distributed mode (#766)
with this PR, when delete stream is called, querier deletes the stream folder from the storage then calls delete stream API for each ingestor. Finally, ingestor deletes the stream from its local map
1 parent 7e708bb commit fb8a105

File tree

3 files changed

+46
-27
lines changed

3 files changed

+46
-27
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ pub async fn sync_streams_with_ingestors(
9696
stream_name
9797
);
9898

99-
// roll back the stream creation
100-
send_stream_rollback_request(&url, ingestor.clone()).await?;
99+
// delete the stream
100+
send_stream_delete_request(&url, ingestor.clone()).await?;
101101
}
102102

103103
// this might be a bit too much
@@ -188,15 +188,13 @@ async fn send_stream_sync_request(
188188
}
189189

190190
/// send a rollback request to all ingestors
191-
#[allow(dead_code)]
192-
async fn send_stream_rollback_request(
191+
pub async fn send_stream_delete_request(
193192
url: &str,
194193
ingestor: IngestorMetadata,
195194
) -> Result<(), StreamError> {
196195
if !utils::check_liveness(&ingestor.domain_name).await {
197196
return Ok(());
198197
}
199-
200198
let client = reqwest::Client::new();
201199
let resp = client
202200
.delete(url)
@@ -207,7 +205,7 @@ async fn send_stream_rollback_request(
207205
.map_err(|err| {
208206
// log the error and return a custom error
209207
log::error!(
210-
"Fatal: failed to rollback stream creation: {}\n Error: {:?}",
208+
"Fatal: failed to delete stream: {}\n Error: {:?}",
211209
ingestor.domain_name,
212210
err
213211
);
@@ -218,18 +216,10 @@ async fn send_stream_rollback_request(
218216
// this could be a bit too much, but we need to be sure it covers all cases
219217
if !resp.status().is_success() {
220218
log::error!(
221-
"failed to rollback stream creation: {}\nResponse Returned: {:?}",
219+
"failed to delete stream: {}\nResponse Returned: {:?}",
222220
ingestor.domain_name,
223221
resp
224222
);
225-
return Err(StreamError::Custom {
226-
msg: format!(
227-
"failed to rollback stream creation: {}\nResponse Returned: {:?}",
228-
ingestor.domain_name,
229-
resp.text().await.unwrap_or_default()
230-
),
231-
status: StatusCode::INTERNAL_SERVER_ERROR,
232-
});
233223
}
234224

235225
Ok(())

server/src/handlers/http/logstream.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
2626
use crate::{catalog, event, stats};
2727
use crate::{metadata, validator};
2828

29+
use super::base_path_without_preceding_slash;
2930
use super::cluster::fetch_stats_from_ingestors;
3031
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
3132
use actix_web::http::StatusCode;
@@ -40,28 +41,49 @@ use std::sync::Arc;
4041

4142
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4243
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
43-
4444
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
4545
return Err(StreamError::StreamNotFound(stream_name));
4646
}
47+
match CONFIG.parseable.mode {
48+
Mode::Query | Mode::All => {
49+
let objectstore = CONFIG.storage().get_object_store();
50+
51+
objectstore.delete_stream(&stream_name).await?;
52+
let stream_dir = StorageDir::new(&stream_name);
53+
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
54+
log::warn!(
55+
"failed to delete local data for stream {}. Clean {} manually",
56+
stream_name,
57+
stream_dir.data_path.to_string_lossy()
58+
)
59+
}
60+
61+
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
62+
log::error!("Fatal: failed to get ingestor info: {:?}", err);
63+
StreamError::from(err)
64+
})?;
65+
66+
for ingestor in ingestor_metadata {
67+
let url = format!(
68+
"{}{}/logstream/{}",
69+
ingestor.domain_name,
70+
base_path_without_preceding_slash(),
71+
stream_name
72+
);
73+
74+
// delete the stream
75+
super::cluster::send_stream_delete_request(&url, ingestor.clone()).await?;
76+
}
77+
}
78+
_ => {}
79+
}
4780

48-
let objectstore = CONFIG.storage().get_object_store();
49-
objectstore.delete_stream(&stream_name).await?;
5081
metadata::STREAM_INFO.delete_stream(&stream_name);
5182
event::STREAM_WRITERS.delete_stream(&stream_name);
5283
stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| {
5384
log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e)
5485
});
5586

56-
let stream_dir = StorageDir::new(&stream_name);
57-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
58-
log::warn!(
59-
"failed to delete local data for stream {}. Clean {} manually",
60-
stream_name,
61-
stream_dir.data_path.to_string_lossy()
62-
)
63-
}
64-
6587
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
6688
}
6789

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ impl IngestServer {
157157
fn logstream_api() -> Scope {
158158
web::scope("/logstream").service(
159159
web::scope("/{logstream}")
160+
.service(
161+
web::resource("").route(
162+
web::delete()
163+
.to(logstream::delete)
164+
.authorize_for_stream(Action::DeleteStream),
165+
),
166+
)
160167
.service(
161168
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
162169
web::resource("/stats").route(

0 commit comments

Comments
 (0)