diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index a4720506f..be3aa3af7 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -48,6 +48,46 @@ use super::base_path_without_preceding_slash; use super::modal::IngestorMetadata; +pub async fn sync_cache_with_ingestors( + url: &str, + ingestor: IngestorMetadata, + body: bool, +) -> Result<(), StreamError> { + if !utils::check_liveness(&ingestor.domain_name).await { + return Ok(()); + } + let request_body: Bytes = Bytes::from(body.to_string()); + let client = reqwest::Client::new(); + let resp = client + .put(url) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingestor.token) + .body(request_body) + .send() + .await + .map_err(|err| { + // log the error and return a custom error + log::error!( + "Fatal: failed to set cache: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + StreamError::Network(err) + })?; + + // if the response is not successful, log the error and return a custom error + // this could be a bit too much, but we need to be sure it covers all cases + if !resp.status().is_success() { + log::error!( + "failed to set cache: {}\nResponse Returned: {:?}", + ingestor.domain_name, + resp.text().await + ); + } + + Ok(()) +} + // forward the request to all ingestors to keep them in sync #[allow(dead_code)] pub async fn sync_streams_with_ingestors( @@ -218,7 +258,7 @@ pub async fn send_stream_delete_request( log::error!( "failed to delete stream: {}\nResponse Returned: {:?}", ingestor.domain_name, - resp + resp.text().await ); } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e244b1d02..9e7f40324 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -17,6 +17,9 @@ */ use self::error::{CreateStreamError, StreamError}; +use super::base_path_without_preceding_slash; +use super::cluster::fetch_stats_from_ingestors; +use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use crate::alerts::Alerts; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::metadata::STREAM_INFO; @@ -25,15 +28,12 @@ use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; - -use super::base_path_without_preceding_slash; -use super::cluster::fetch_stats_from_ingestors; -use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; +use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; @@ -301,33 +301,66 @@ pub async fn put_enable_cache( req: HttpRequest, body: web::Json, ) -> Result { - let enable_cache = body.into_inner(); let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - if CONFIG.parseable.mode == Mode::Ingest { - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let streams = storage.list_streams().await?; - if !streams.contains(&LogStream { - name: stream_name.clone().to_owned(), - }) { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); + match CONFIG.parseable.mode { + Mode::Query => { + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::from(err) + })?; + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}/cache", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?; + } + } + Mode::Ingest => { + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + // here the ingest server has not found the stream + // so it should check if the stream exists in storage + let check = storage + .list_streams() + .await? + .iter() + .map(|stream| stream.name.clone()) + .contains(&stream_name); + + if !check { + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + metadata::STREAM_INFO + .upsert_stream_info( + &*storage, + LogStream { + name: stream_name.clone().to_owned(), + }, + ) + .await + .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + } + Mode::All => { + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } } - metadata::STREAM_INFO - .upsert_stream_info( - &*storage, - LogStream { - name: stream_name.clone().to_owned(), - }, - ) - .await - .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; } + let enable_cache = body.into_inner(); let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; stream_metadata.cache_enabled = enable_cache; storage diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index ac92c7ca1..59253862e 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -179,6 +179,12 @@ impl IngestServer { web::put() .to(logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), ), ), )