From 214d466112e9bef7f7eab84814cb4bf72bf1db0c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 18 Apr 2024 13:19:00 +0530 Subject: [PATCH 1/3] fix: caching for distributed cache can be enabled from querier UI querier calls the PUT /cache API to all ingesters at ingester, first checks if stream exists, if not found in local map, check in S3 and create stream then checks if caching env vars are set if yes, add cache_enabled flag to STREAM_INFO and update its stream.json in S3 Note: Query still doesn't use the cached data Fixes: 764 --- server/src/handlers/http/cluster/mod.rs | 42 ++++++++++++++++- server/src/handlers/http/logstream.rs | 45 +++++++++++++++---- .../src/handlers/http/modal/ingest_server.rs | 6 +++ 3 files changed, 84 insertions(+), 9 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index a4720506f..be3aa3af7 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -48,6 +48,46 @@ use super::base_path_without_preceding_slash; use super::modal::IngestorMetadata; +pub async fn sync_cache_with_ingestors( + url: &str, + ingestor: IngestorMetadata, + body: bool, +) -> Result<(), StreamError> { + if !utils::check_liveness(&ingestor.domain_name).await { + return Ok(()); + } + let request_body: Bytes = Bytes::from(body.to_string()); + let client = reqwest::Client::new(); + let resp = client + .put(url) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingestor.token) + .body(request_body) + .send() + .await + .map_err(|err| { + // log the error and return a custom error + log::error!( + "Fatal: failed to set cache: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + StreamError::Network(err) + })?; + + // if the response is not successful, log the error and return a custom error + // this could be a bit too much, but we need to be sure it covers all cases + if !resp.status().is_success() { + log::error!( + "failed to set cache: {}\nResponse Returned: {:?}", + ingestor.domain_name, + resp.text().await + ); + } + + Ok(()) +} + // forward the request to all ingestors to keep them in sync #[allow(dead_code)] pub async fn sync_streams_with_ingestors( @@ -218,7 +258,7 @@ pub async fn send_stream_delete_request( log::error!( "failed to delete stream: {}\nResponse Returned: {:?}", ingestor.domain_name, - resp + resp.text().await ); } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e244b1d02..655208ed6 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -25,7 +25,6 @@ use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; - use super::base_path_without_preceding_slash; use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; @@ -301,15 +300,35 @@ pub async fn put_enable_cache( req: HttpRequest, body: web::Json, ) -> Result { - let enable_cache = body.into_inner(); let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - if CONFIG.parseable.mode == Mode::Ingest { - // here the ingest server has not found the stream + + match CONFIG.parseable.mode { + Mode::Query => { + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::from(err) + })?; + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}/cache", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + // delete the stream + super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), body.clone()).await?; + } + } + Mode::Ingest => { + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + // here the ingest server has not found the stream // so it should check if the stream exists in storage let streams = storage.list_streams().await?; if !streams.contains(&LogStream { @@ -327,7 +346,17 @@ pub async fn put_enable_cache( ) .await .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + } + Mode::All => { + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + } } + let enable_cache = body.into_inner(); let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; stream_metadata.cache_enabled = enable_cache; storage diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index ac92c7ca1..59253862e 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -179,6 +179,12 @@ impl IngestServer { web::put() .to(logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), ), ), ) From e939bf3612c6dd99d4eecab0f8671f6c013bee6b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 18 Apr 2024 13:29:29 +0530 Subject: [PATCH 2/3] fixed clippy suggestions and fmt --- server/src/handlers/http/logstream.rs | 48 +++++++++++++-------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 655208ed6..7acf6793b 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -17,6 +17,9 @@ */ use self::error::{CreateStreamError, StreamError}; +use super::base_path_without_preceding_slash; +use super::cluster::fetch_stats_from_ingestors; +use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use crate::alerts::Alerts; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::metadata::STREAM_INFO; @@ -25,9 +28,6 @@ use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; -use super::base_path_without_preceding_slash; -use super::cluster::fetch_stats_from_ingestors; -use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; @@ -302,9 +302,9 @@ pub async fn put_enable_cache( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - + match CONFIG.parseable.mode { - Mode::Query => { + Mode::Query => { if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } @@ -319,9 +319,9 @@ pub async fn put_enable_cache( base_path_without_preceding_slash(), stream_name ); - + // delete the stream - super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), body.clone()).await?; + super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?; } } Mode::Ingest => { @@ -329,23 +329,23 @@ pub async fn put_enable_cache( return Err(StreamError::CacheNotEnabled(stream_name)); } // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let streams = storage.list_streams().await?; - if !streams.contains(&LogStream { - name: stream_name.clone().to_owned(), - }) { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - metadata::STREAM_INFO - .upsert_stream_info( - &*storage, - LogStream { - name: stream_name.clone().to_owned(), - }, - ) - .await - .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + // so it should check if the stream exists in storage + let streams = storage.list_streams().await?; + if !streams.contains(&LogStream { + name: stream_name.clone().to_owned(), + }) { + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + metadata::STREAM_INFO + .upsert_stream_info( + &*storage, + LogStream { + name: stream_name.clone().to_owned(), + }, + ) + .await + .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; } Mode::All => { if !metadata::STREAM_INFO.stream_exists(&stream_name) { From da58054757ca001764b0de27be6b5e0c1006ed95 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 18 Apr 2024 16:13:18 +0530 Subject: [PATCH 3/3] review comments implemented --- server/src/handlers/http/logstream.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 7acf6793b..9e7f40324 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -33,6 +33,7 @@ use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; +use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; @@ -320,7 +321,6 @@ pub async fn put_enable_cache( stream_name ); - // delete the stream super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?; } } @@ -330,10 +330,14 @@ pub async fn put_enable_cache( } // here the ingest server has not found the stream // so it should check if the stream exists in storage - let streams = storage.list_streams().await?; - if !streams.contains(&LogStream { - name: stream_name.clone().to_owned(), - }) { + let check = storage + .list_streams() + .await? + .iter() + .map(|stream| stream.name.clone()) + .contains(&stream_name); + + if !check { log::error!("Stream {} not found", stream_name.clone()); return Err(StreamError::StreamNotFound(stream_name.clone())); }