Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 88 additions & 38 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers::http::base_path_without_preceding_slash;
use crate::option::CONFIG;
use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use crate::{handlers, Mode};
use bytes::Bytes;
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;

use self::{column::Column, snapshot::ManifestItem};

pub mod column;
pub mod manifest;
pub mod snapshot;
Expand Down Expand Up @@ -208,51 +210,99 @@ pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
dates: Vec<String>,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
) -> Result<Option<String>, ObjectStorageError> {
match CONFIG.parseable.mode {
Mode::All | Mode::Ingest => {
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests
.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
let first_event_at = get_first_event(storage.clone(), stream_name, Vec::new()).await?;

storage.put_snapshot(stream_name, meta.snapshot).await?;
Ok(())
Ok(first_event_at)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
}
}

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}

let manifest = &manifests[0];

let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
let mut first_event_at: String = String::default();
match CONFIG.parseable.mode {
Mode::All | Mode::Ingest => {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
}
}
Mode::Query => {
let ingestor_metadata =
handlers::http::cluster::get_ingestor_info()
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let mut ingestors_first_event_at: Vec<String> = Vec::new();
for ingestor in ingestor_metadata {
let url = format!(
"{}{}/logstream/{}/retention/cleanup",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);
// Convert dates vector to Bytes object
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
// delete the stream

if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
dates_bytes,
)
.await?;
if !ingestor_first_event_at.is_empty() {
ingestors_first_event_at.push(ingestor_first_event_at);
}
}
if ingestors_first_event_at.is_empty() {
return Ok(None);
}
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
}
}
Ok(None)

Ok(Some(first_event_at))
}

/// Partition the path to which this manifest belongs.
Expand Down
50 changes: 48 additions & 2 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use serde::de::Error;
use serde_json::error::Error as SerdeError;
use serde_json::Value as JsonValue;
use url::Url;

type IngestorMetadataArr = Vec<IngestorMetadata>;

use self::utils::StorageStats;
Expand Down Expand Up @@ -229,7 +228,7 @@ async fn send_stream_sync_request(
Ok(())
}

/// send a rollback request to all ingestors
/// send a delete stream request to all ingestors
pub async fn send_stream_delete_request(
url: &str,
ingestor: IngestorMetadata,
Expand Down Expand Up @@ -267,6 +266,53 @@ pub async fn send_stream_delete_request(
Ok(())
}

/// send a retention cleanup request to all ingestors
pub async fn send_retention_cleanup_request(
url: &str,
ingestor: IngestorMetadata,
body: Bytes,
) -> Result<String, ObjectStorageError> {
let mut first_event_at: String = String::default();
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
let client = reqwest::Client::new();
let resp = client
.post(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(body)
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to perform cleanup on retention: {}\n Error: {:?}",
ingestor.domain_name,
err
);
ObjectStorageError::Custom(err.to_string())
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.status()
);
}

let resp_data = resp.bytes().await.map_err(|err| {
log::error!("Fatal: failed to parse response to bytes: {:?}", err);
ObjectStorageError::Custom(err.to_string())
})?;

first_event_at = String::from_utf8_lossy(&resp_data).to_string();
Ok(first_event_at)
}

pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
Expand Down
51 changes: 49 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
use crate::{catalog, event, stats};
use crate::{
catalog::{self, remove_manifest_from_snapshot},
event, stats,
};
use crate::{metadata, validator};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
Expand Down Expand Up @@ -87,6 +90,48 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

pub async fn retention_cleanup(
req: HttpRequest,
body: Bytes,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let storage = CONFIG.storage().get_object_store();
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let check = storage
.list_streams()
.await?
.iter()
.map(|stream| stream.name.clone())
.contains(&stream_name);

if !check {
log::error!("Stream {} not found", stream_name.clone());
return Err(StreamError::StreamNotFound(stream_name.clone()));
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
},
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
}
let date_list: Vec<String> = serde_json::from_slice(&body).unwrap();
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
let mut first_event_at: Option<String> = None;
if let Err(err) = res {
log::error!("Failed to update manifest list in the snapshot {err:?}")
} else {
first_event_at = res.unwrap();
}

Ok((first_event_at, StatusCode::OK))
}

pub async fn list(_: HttpRequest) -> impl Responder {
let res: Vec<LogStream> = STREAM_INFO
.list_streams()
Expand Down Expand Up @@ -515,7 +560,9 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE

if first_event_at_empty(&stream_name) {
let store = CONFIG.storage().get_object_store();
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
let dates: Vec<String> = Vec::new();
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await
{
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
{
Expand Down
17 changes: 17 additions & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ impl IngestServer {
.authorize_for_stream(Action::DeleteStream),
),
)
.service(
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
web::resource("/info").route(
web::get()
.to(logstream::get_stream_info)
.authorize_for_stream(Action::GetStream),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
Expand All @@ -191,6 +199,15 @@ impl IngestServer {
.to(logstream::get_cache_enabled)
.authorize_for_stream(Action::GetCacheEnabled),
),
)
.service(
web::scope("/retention").service(
web::resource("/cleanup").route(
web::post()
.to(logstream::retention_cleanup)
.authorize_for_stream(Action::PutRetention),
),
),
),
)
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pub enum ObjectStorageError {
// no such key inside the object storage
#[error("{0} not found")]
NoSuchKey(String),
#[error("Invalid Request: {0}")]
Invalid(#[from] anyhow::Error),

// custom
#[error("{0}")]
Expand Down
21 changes: 5 additions & 16 deletions server/src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,31 +193,25 @@ mod action {
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::{
catalog::{self, remove_manifest_from_snapshot},
metadata,
option::CONFIG,
};
use crate::{catalog::remove_manifest_from_snapshot, metadata, option::CONFIG};

pub(super) async fn delete(stream_name: String, days: u32) {
log::info!("running retention task - delete for stream={stream_name}");
let retain_until = get_retain_until(Utc::now().date_naive(), days as u64);

let Ok(dates) = CONFIG
let Ok(mut dates) = CONFIG
.storage()
.get_object_store()
.list_dates(&stream_name)
.await
else {
return;
};

dates.retain(|date| date.starts_with("date"));
let dates_to_delete = dates
.into_iter()
.filter(|date| string_to_date(date) < retain_until)
.collect_vec();
let dates = dates_to_delete.clone();

let delete_tasks = FuturesUnordered::new();
for date in dates_to_delete {
let path = RelativePathBuf::from_iter([&stream_name, &date]);
Expand All @@ -240,13 +234,8 @@ mod action {

let store = CONFIG.storage().get_object_store();
let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await;
if let Err(err) = res {
log::error!("Failed to update manifest list in the snapshot {err:?}")
}

if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
if let Ok(first_event_at) = res {
if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
{
log::error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
Expand Down