diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 58f3f79f8..a4720506f 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -96,8 +96,8 @@ pub async fn sync_streams_with_ingestors( stream_name ); - // roll back the stream creation - send_stream_rollback_request(&url, ingestor.clone()).await?; + // delete the stream + send_stream_delete_request(&url, ingestor.clone()).await?; } // this might be a bit too much @@ -188,15 +188,13 @@ async fn send_stream_sync_request( } /// send a rollback request to all ingestors -#[allow(dead_code)] -async fn send_stream_rollback_request( +pub async fn send_stream_delete_request( url: &str, ingestor: IngestorMetadata, ) -> Result<(), StreamError> { if !utils::check_liveness(&ingestor.domain_name).await { return Ok(()); } - let client = reqwest::Client::new(); let resp = client .delete(url) @@ -207,7 +205,7 @@ async fn send_stream_rollback_request( .map_err(|err| { // log the error and return a custom error log::error!( - "Fatal: failed to rollback stream creation: {}\n Error: {:?}", + "Fatal: failed to delete stream: {}\n Error: {:?}", ingestor.domain_name, err ); @@ -218,18 +216,10 @@ async fn send_stream_rollback_request( // this could be a bit too much, but we need to be sure it covers all cases if !resp.status().is_success() { log::error!( - "failed to rollback stream creation: {}\nResponse Returned: {:?}", + "failed to delete stream: {}\nResponse Returned: {:?}", ingestor.domain_name, resp ); - return Err(StreamError::Custom { - msg: format!( - "failed to rollback stream creation: {}\nResponse Returned: {:?}", - ingestor.domain_name, - resp.text().await.unwrap_or_default() - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); } Ok(()) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 827e0729e..e244b1d02 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -26,6 +26,7 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; +use super::base_path_without_preceding_slash; use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use actix_web::http::StatusCode; @@ -40,28 +41,49 @@ use std::sync::Arc; pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } + match CONFIG.parseable.mode { + Mode::Query | Mode::All => { + let objectstore = CONFIG.storage().get_object_store(); + + objectstore.delete_stream(&stream_name).await?; + let stream_dir = StorageDir::new(&stream_name); + if fs::remove_dir_all(&stream_dir.data_path).is_err() { + log::warn!( + "failed to delete local data for stream {}. Clean {} manually", + stream_name, + stream_dir.data_path.to_string_lossy() + ) + } + + let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::from(err) + })?; + + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + // delete the stream + super::cluster::send_stream_delete_request(&url, ingestor.clone()).await?; + } + } + _ => {} + } - let objectstore = CONFIG.storage().get_object_store(); - objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) }); - let stream_dir = StorageDir::new(&stream_name); - if fs::remove_dir_all(&stream_dir.data_path).is_err() { - log::warn!( - "failed to delete local data for stream {}. Clean {} manually", - stream_name, - stream_dir.data_path.to_string_lossy() - ) - } - Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 0b565cee1..ac92c7ca1 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -157,6 +157,13 @@ impl IngestServer { fn logstream_api() -> Scope { web::scope("/logstream").service( web::scope("/{logstream}") + .service( + web::resource("").route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ), + ) .service( // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream web::resource("/stats").route(