Skip to content

Commit c79b255

Browse files
fix for retention and get stream info (#771)
for distributed and standalone deployments
1 parent bb849d1 commit c79b255

File tree

6 files changed

+209
-58
lines changed

6 files changed

+209
-58
lines changed

server/src/catalog.rs

Lines changed: 88 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818

1919
use std::{io::ErrorKind, sync::Arc};
2020

21+
use self::{column::Column, snapshot::ManifestItem};
22+
use crate::handlers::http::base_path_without_preceding_slash;
23+
use crate::option::CONFIG;
2124
use crate::{
2225
catalog::manifest::Manifest,
2326
query::PartialTimeFilter,
2427
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
2528
};
29+
use crate::{handlers, Mode};
30+
use bytes::Bytes;
2631
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
2732
use relative_path::RelativePathBuf;
2833
use std::io::Error as IOError;
29-
30-
use self::{column::Column, snapshot::ManifestItem};
31-
3234
pub mod column;
3335
pub mod manifest;
3436
pub mod snapshot;
@@ -208,51 +210,99 @@ pub async fn remove_manifest_from_snapshot(
208210
storage: Arc<dyn ObjectStorage + Send>,
209211
stream_name: &str,
210212
dates: Vec<String>,
211-
) -> Result<(), ObjectStorageError> {
212-
// get current snapshot
213-
let mut meta = storage.get_object_store_format(stream_name).await?;
214-
let manifests = &mut meta.snapshot.manifest_list;
213+
) -> Result<Option<String>, ObjectStorageError> {
214+
match CONFIG.parseable.mode {
215+
Mode::All | Mode::Ingest => {
216+
if !dates.is_empty() {
217+
// get current snapshot
218+
let mut meta = storage.get_object_store_format(stream_name).await?;
219+
let manifests = &mut meta.snapshot.manifest_list;
220+
// Filter out items whose manifest_path contains any of the dates_to_delete
221+
manifests
222+
.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
223+
storage.put_snapshot(stream_name, meta.snapshot).await?;
224+
}
215225

216-
// Filter out items whose manifest_path contains any of the dates_to_delete
217-
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
226+
let first_event_at = get_first_event(storage.clone(), stream_name, Vec::new()).await?;
218227

219-
storage.put_snapshot(stream_name, meta.snapshot).await?;
220-
Ok(())
228+
Ok(first_event_at)
229+
}
230+
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
231+
}
221232
}
222233

223234
pub async fn get_first_event(
224235
storage: Arc<dyn ObjectStorage + Send>,
225236
stream_name: &str,
237+
dates: Vec<String>,
226238
) -> Result<Option<String>, ObjectStorageError> {
227-
// get current snapshot
228-
let mut meta = storage.get_object_store_format(stream_name).await?;
229-
let manifests = &mut meta.snapshot.manifest_list;
230-
if manifests.is_empty() {
231-
log::info!("No manifest found for stream {stream_name}");
232-
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
233-
}
234-
235-
let manifest = &manifests[0];
236-
237-
let path = partition_path(
238-
stream_name,
239-
manifest.time_lower_bound,
240-
manifest.time_upper_bound,
241-
);
242-
let Some(manifest) = storage.get_manifest(&path).await? else {
243-
return Err(ObjectStorageError::UnhandledError(
244-
"Manifest found in snapshot but not in object-storage"
245-
.to_string()
246-
.into(),
247-
));
248-
};
239+
let mut first_event_at: String = String::default();
240+
match CONFIG.parseable.mode {
241+
Mode::All | Mode::Ingest => {
242+
// get current snapshot
243+
let mut meta = storage.get_object_store_format(stream_name).await?;
244+
let manifests = &mut meta.snapshot.manifest_list;
245+
if manifests.is_empty() {
246+
log::info!("No manifest found for stream {stream_name}");
247+
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
248+
}
249+
let manifest = &manifests[0];
250+
let path = partition_path(
251+
stream_name,
252+
manifest.time_lower_bound,
253+
manifest.time_upper_bound,
254+
);
255+
let Some(manifest) = storage.get_manifest(&path).await? else {
256+
return Err(ObjectStorageError::UnhandledError(
257+
"Manifest found in snapshot but not in object-storage"
258+
.to_string()
259+
.into(),
260+
));
261+
};
262+
if let Some(first_event) = manifest.files.first() {
263+
let (lower_bound, _) = get_file_bounds(first_event);
264+
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
265+
}
266+
}
267+
Mode::Query => {
268+
let ingestor_metadata =
269+
handlers::http::cluster::get_ingestor_info()
270+
.await
271+
.map_err(|err| {
272+
log::error!("Fatal: failed to get ingestor info: {:?}", err);
273+
ObjectStorageError::from(err)
274+
})?;
275+
let mut ingestors_first_event_at: Vec<String> = Vec::new();
276+
for ingestor in ingestor_metadata {
277+
let url = format!(
278+
"{}{}/logstream/{}/retention/cleanup",
279+
ingestor.domain_name,
280+
base_path_without_preceding_slash(),
281+
stream_name
282+
);
283+
// Convert dates vector to Bytes object
284+
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
285+
// delete the stream
249286

250-
if let Some(first_event) = manifest.files.first() {
251-
let (lower_bound, _) = get_file_bounds(first_event);
252-
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
253-
return Ok(Some(first_event_at));
287+
let ingestor_first_event_at =
288+
handlers::http::cluster::send_retention_cleanup_request(
289+
&url,
290+
ingestor.clone(),
291+
dates_bytes,
292+
)
293+
.await?;
294+
if !ingestor_first_event_at.is_empty() {
295+
ingestors_first_event_at.push(ingestor_first_event_at);
296+
}
297+
}
298+
if ingestors_first_event_at.is_empty() {
299+
return Ok(None);
300+
}
301+
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
302+
}
254303
}
255-
Ok(None)
304+
305+
Ok(Some(first_event_at))
256306
}
257307

258308
/// Partition the path to which this manifest belongs.

server/src/handlers/http/cluster/mod.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use serde::de::Error;
4141
use serde_json::error::Error as SerdeError;
4242
use serde_json::Value as JsonValue;
4343
use url::Url;
44-
4544
type IngestorMetadataArr = Vec<IngestorMetadata>;
4645

4746
use self::utils::StorageStats;
@@ -229,7 +228,7 @@ async fn send_stream_sync_request(
229228
Ok(())
230229
}
231230

232-
/// send a rollback request to all ingestors
231+
/// send a delete stream request to all ingestors
233232
pub async fn send_stream_delete_request(
234233
url: &str,
235234
ingestor: IngestorMetadata,
@@ -267,6 +266,53 @@ pub async fn send_stream_delete_request(
267266
Ok(())
268267
}
269268

269+
/// send a retention cleanup request to all ingestors
270+
pub async fn send_retention_cleanup_request(
271+
url: &str,
272+
ingestor: IngestorMetadata,
273+
body: Bytes,
274+
) -> Result<String, ObjectStorageError> {
275+
let mut first_event_at: String = String::default();
276+
if !utils::check_liveness(&ingestor.domain_name).await {
277+
return Ok(first_event_at);
278+
}
279+
let client = reqwest::Client::new();
280+
let resp = client
281+
.post(url)
282+
.header(header::CONTENT_TYPE, "application/json")
283+
.header(header::AUTHORIZATION, ingestor.token)
284+
.body(body)
285+
.send()
286+
.await
287+
.map_err(|err| {
288+
// log the error and return a custom error
289+
log::error!(
290+
"Fatal: failed to perform cleanup on retention: {}\n Error: {:?}",
291+
ingestor.domain_name,
292+
err
293+
);
294+
ObjectStorageError::Custom(err.to_string())
295+
})?;
296+
297+
// if the response is not successful, log the error and return a custom error
298+
// this could be a bit too much, but we need to be sure it covers all cases
299+
if !resp.status().is_success() {
300+
log::error!(
301+
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
302+
ingestor.domain_name,
303+
resp.status()
304+
);
305+
}
306+
307+
let resp_data = resp.bytes().await.map_err(|err| {
308+
log::error!("Fatal: failed to parse response to bytes: {:?}", err);
309+
ObjectStorageError::Custom(err.to_string())
310+
})?;
311+
312+
first_event_at = String::from_utf8_lossy(&resp_data).to_string();
313+
Ok(first_event_at)
314+
}
315+
270316
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
271317
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
272318
log::error!("Fatal: failed to get ingestor info: {:?}", err);

server/src/handlers/http/logstream.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use crate::metadata::STREAM_INFO;
2626
use crate::option::{Mode, CONFIG};
2727
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
2828
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
29-
use crate::{catalog, event, stats};
29+
use crate::{
30+
catalog::{self, remove_manifest_from_snapshot},
31+
event, stats,
32+
};
3033
use crate::{metadata, validator};
3134
use actix_web::http::StatusCode;
3235
use actix_web::{web, HttpRequest, Responder};
@@ -87,6 +90,48 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8790
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
8891
}
8992

93+
pub async fn retention_cleanup(
94+
req: HttpRequest,
95+
body: Bytes,
96+
) -> Result<impl Responder, StreamError> {
97+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
98+
let storage = CONFIG.storage().get_object_store();
99+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
100+
// here the ingest server has not found the stream
101+
// so it should check if the stream exists in storage
102+
let check = storage
103+
.list_streams()
104+
.await?
105+
.iter()
106+
.map(|stream| stream.name.clone())
107+
.contains(&stream_name);
108+
109+
if !check {
110+
log::error!("Stream {} not found", stream_name.clone());
111+
return Err(StreamError::StreamNotFound(stream_name.clone()));
112+
}
113+
metadata::STREAM_INFO
114+
.upsert_stream_info(
115+
&*storage,
116+
LogStream {
117+
name: stream_name.clone().to_owned(),
118+
},
119+
)
120+
.await
121+
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
122+
}
123+
let date_list: Vec<String> = serde_json::from_slice(&body).unwrap();
124+
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
125+
let mut first_event_at: Option<String> = None;
126+
if let Err(err) = res {
127+
log::error!("Failed to update manifest list in the snapshot {err:?}")
128+
} else {
129+
first_event_at = res.unwrap();
130+
}
131+
132+
Ok((first_event_at, StatusCode::OK))
133+
}
134+
90135
pub async fn list(_: HttpRequest) -> impl Responder {
91136
let res: Vec<LogStream> = STREAM_INFO
92137
.list_streams()
@@ -515,7 +560,9 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
515560

516561
if first_event_at_empty(&stream_name) {
517562
let store = CONFIG.storage().get_object_store();
518-
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
563+
let dates: Vec<String> = Vec::new();
564+
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await
565+
{
519566
if let Err(err) =
520567
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
521568
{

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ impl IngestServer {
169169
.authorize_for_stream(Action::DeleteStream),
170170
),
171171
)
172+
.service(
173+
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
174+
web::resource("/info").route(
175+
web::get()
176+
.to(logstream::get_stream_info)
177+
.authorize_for_stream(Action::GetStream),
178+
),
179+
)
172180
.service(
173181
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
174182
web::resource("/stats").route(
@@ -191,6 +199,15 @@ impl IngestServer {
191199
.to(logstream::get_cache_enabled)
192200
.authorize_for_stream(Action::GetCacheEnabled),
193201
),
202+
)
203+
.service(
204+
web::scope("/retention").service(
205+
web::resource("/cleanup").route(
206+
web::post()
207+
.to(logstream::retention_cleanup)
208+
.authorize_for_stream(Action::PutRetention),
209+
),
210+
),
194211
),
195212
)
196213
}

server/src/storage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ pub enum ObjectStorageError {
177177
// no such key inside the object storage
178178
#[error("{0} not found")]
179179
NoSuchKey(String),
180+
#[error("Invalid Request: {0}")]
181+
Invalid(#[from] anyhow::Error),
180182

181183
// custom
182184
#[error("{0}")]

server/src/storage/retention.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -193,31 +193,25 @@ mod action {
193193
use itertools::Itertools;
194194
use relative_path::RelativePathBuf;
195195

196-
use crate::{
197-
catalog::{self, remove_manifest_from_snapshot},
198-
metadata,
199-
option::CONFIG,
200-
};
196+
use crate::{catalog::remove_manifest_from_snapshot, metadata, option::CONFIG};
201197

202198
pub(super) async fn delete(stream_name: String, days: u32) {
203199
log::info!("running retention task - delete for stream={stream_name}");
204200
let retain_until = get_retain_until(Utc::now().date_naive(), days as u64);
205-
206-
let Ok(dates) = CONFIG
201+
let Ok(mut dates) = CONFIG
207202
.storage()
208203
.get_object_store()
209204
.list_dates(&stream_name)
210205
.await
211206
else {
212207
return;
213208
};
214-
209+
dates.retain(|date| date.starts_with("date"));
215210
let dates_to_delete = dates
216211
.into_iter()
217212
.filter(|date| string_to_date(date) < retain_until)
218213
.collect_vec();
219214
let dates = dates_to_delete.clone();
220-
221215
let delete_tasks = FuturesUnordered::new();
222216
for date in dates_to_delete {
223217
let path = RelativePathBuf::from_iter([&stream_name, &date]);
@@ -240,13 +234,8 @@ mod action {
240234

241235
let store = CONFIG.storage().get_object_store();
242236
let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await;
243-
if let Err(err) = res {
244-
log::error!("Failed to update manifest list in the snapshot {err:?}")
245-
}
246-
247-
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
248-
if let Err(err) =
249-
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
237+
if let Ok(first_event_at) = res {
238+
if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
250239
{
251240
log::error!(
252241
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",

0 commit comments

Comments
 (0)