From c6e416f11445f7fac0d9634527090ea160dd76fb Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 29 Jul 2024 22:33:22 +0530 Subject: [PATCH 01/12] switched to tokio tasks --- .../src/handlers/http/modal/ingest_server.rs | 18 +- .../src/handlers/http/modal/query_server.rs | 417 +++---- server/src/handlers/http/modal/server.rs | 1079 +++++++++-------- server/src/storage/retention.rs | 157 ++- server/src/sync.rs | 143 +-- 5 files changed, 886 insertions(+), 928 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index b5f31a4a5..91eafac6b 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -330,9 +330,9 @@ impl IngestServer { migration::run_migration(&CONFIG).await?; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync(); + sync::object_store_sync().await; tokio::spawn(airplane::server()); @@ -345,8 +345,12 @@ impl IngestServer { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -356,8 +360,10 @@ impl IngestServer { }, _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } }; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index c1323f8e2..9e64fde11 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,208 +16,215 @@ * */ -use crate::handlers::airplane; -use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; -use crate::handlers::http::middleware::RouteExt; -use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; - -use crate::rbac::role::Action; -use crate::sync; -use crate::users::dashboards::DASHBOARDS; -use crate::users::filters::FILTERS; -use crate::{analytics, banner, metrics, migration, rbac, storage}; -use actix_web::web; -use actix_web::web::ServiceConfig; -use actix_web::{App, HttpServer}; -use async_trait::async_trait; -use std::sync::Arc; - -use crate::option::CONFIG; - -use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; -use super::{OpenIdClient, ParseableServer}; - -#[derive(Default, Debug)] -pub struct QueryServer; - -#[async_trait(?Send)] -impl ParseableServer for QueryServer { - async fn start( - &self, - prometheus: actix_web_prometheus::PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - - None => None, - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - )?; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - .await?; - } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - // initialize the rbac map - rbac::map::init(&metadata); - // keep metadata info in mem - metadata.set_global(); - self.initialize().await - } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::anyhow!( - "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } -} - -impl QueryServer { - // configure the api routes - fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { - config - .service( - web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body - .service(Server::get_query_factory()) - .service(Server::get_cache_webscope()) - .service(Server::get_liveness_factory()) - .service(Server::get_readiness_factory()) - .service(Server::get_about_factory()) - .service(Server::get_logstream_webscope()) - .service(Server::get_user_webscope()) - .service(Server::get_dashboards_webscope()) - .service(Server::get_filters_webscope()) - .service(Server::get_llm_webscope()) - .service(Server::get_oauth_webscope(oidc_client)) - .service(Server::get_user_role_webscope()) - .service(Self::get_cluster_web_scope()), - ) - .service(Server::get_generated()); - } - - fn get_cluster_web_scope() -> actix_web::Scope { - web::scope("/cluster") - .service( - // GET "/cluster/info" ==> Get info of the cluster - web::resource("/info").route( - web::get() - .to(cluster::get_cluster_info) - .authorize(Action::ListCluster), - ), - ) - // GET "/cluster/metrics" ==> Get metrics of the cluster - .service( - web::resource("/metrics").route( - web::get() - .to(cluster::get_cluster_metrics) - .authorize(Action::ListClusterMetrics), - ), - ) - // DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster - .service( - web::scope("/{ingestor}").service( - web::resource("").route( - web::delete() - .to(cluster::remove_ingestor) - .authorize(Action::Deleteingestor), - ), - ), - ) - } - - /// initialize the server, run migrations as needed and start the server - async fn initialize(&self) -> anyhow::Result<()> { - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); - - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - if matches!(init_cluster_metrics_schedular(), Ok(())) { - log::info!("Cluster metrics scheduler started successfully"); - } - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync(); - - tokio::spawn(airplane::server()); - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); - } - - }; - } - } -} + use crate::handlers::airplane; + use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; + use crate::handlers::http::middleware::RouteExt; + use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; + + use crate::rbac::role::Action; + use crate::sync; + use crate::users::dashboards::DASHBOARDS; + use crate::users::filters::FILTERS; + use crate::{analytics, banner, metrics, migration, rbac, storage}; + use actix_web::web; + use actix_web::web::ServiceConfig; + use actix_web::{App, HttpServer}; + use async_trait::async_trait; + use std::sync::Arc; + + use crate::option::CONFIG; + + use super::server::Server; + use super::ssl_acceptor::get_ssl_acceptor; + use super::{OpenIdClient, ParseableServer}; + + #[derive(Default, Debug)] + pub struct QueryServer; + + #[async_trait(?Send)] + impl ParseableServer for QueryServer { + async fn start( + &self, + prometheus: actix_web_prometheus::PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + + None => None, + }; + + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + )?; + + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + self.validate()?; + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + // initialize the rbac map + rbac::map::init(&metadata); + // keep metadata info in mem + metadata.set_global(); + self.initialize().await + } + + fn validate(&self) -> anyhow::Result<()> { + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::anyhow!( + "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + )); + } + + Ok(()) + } + } + + impl QueryServer { + // configure the api routes + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Server::get_query_factory()) + .service(Server::get_cache_webscope()) + .service(Server::get_liveness_factory()) + .service(Server::get_readiness_factory()) + .service(Server::get_about_factory()) + .service(Server::get_logstream_webscope()) + .service(Server::get_user_webscope()) + .service(Server::get_dashboards_webscope()) + .service(Server::get_filters_webscope()) + .service(Server::get_llm_webscope()) + .service(Server::get_oauth_webscope(oidc_client)) + .service(Server::get_user_role_webscope()) + .service(Self::get_cluster_web_scope()), + ) + .service(Server::get_generated()); + } + + fn get_cluster_web_scope() -> actix_web::Scope { + web::scope("/cluster") + .service( + // GET "/cluster/info" ==> Get info of the cluster + web::resource("/info").route( + web::get() + .to(cluster::get_cluster_info) + .authorize(Action::ListCluster), + ), + ) + // GET "/cluster/metrics" ==> Get metrics of the cluster + .service( + web::resource("/metrics").route( + web::get() + .to(cluster::get_cluster_metrics) + .authorize(Action::ListClusterMetrics), + ), + ) + // DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster + .service( + web::scope("/{ingestor}").service( + web::resource("").route( + web::delete() + .to(cluster::remove_ingestor) + .authorize(Action::Deleteingestor), + ), + ), + ) + } + + /// initialize the server, run migrations as needed and start the server + async fn initialize(&self) -> anyhow::Result<()> { + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining localsync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } + } + \ No newline at end of file diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index ceaf35810..08202013d 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -16,539 +16,546 @@ * */ -use crate::analytics; -use crate::banner; -use crate::handlers; -use crate::handlers::http::about; -use crate::handlers::http::base_path; -use crate::handlers::http::cache; -use crate::handlers::http::health_check; -use crate::handlers::http::query; -use crate::handlers::http::users::dashboards; -use crate::handlers::http::users::filters; -use crate::handlers::http::API_BASE_PATH; -use crate::handlers::http::API_VERSION; -use crate::localcache::LocalCacheManager; -use crate::metrics; -use crate::migration; -use crate::rbac; -use crate::storage; -use crate::sync; -use crate::users::dashboards::DASHBOARDS; -use crate::users::filters::FILTERS; -use std::sync::Arc; - -use actix_web::web::resource; -use actix_web::Resource; -use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; -use actix_web_static_files::ResourceFiles; -use async_trait::async_trait; - -use crate::{ - handlers::http::{ - self, cross_origin_config, ingest, llm, logstream, - middleware::{DisAllowRootUser, RouteExt}, - oidc, role, MAX_EVENT_PAYLOAD_SIZE, - }, - option::CONFIG, - rbac::role::Action, -}; - -// use super::generate; -use super::generate; -use super::ssl_acceptor::get_ssl_acceptor; -use super::OpenIdClient; -use super::ParseableServer; - -#[derive(Default)] -pub struct Server; - -#[async_trait(?Send)] -impl ParseableServer for Server { - async fn start( - &self, - prometheus: PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - None => None, - }; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - )?; - - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - .await?; - } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - self.initialize().await?; - Ok(()) - } - - fn validate(&self) -> anyhow::Result<()> { - Ok(()) - } -} - -impl Server { - fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { - // there might be a bug in the configure routes method - config - .service( - web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body - .service(Self::get_query_factory()) - .service(Self::get_cache_webscope()) - .service(Self::get_ingest_factory()) - .service(Self::get_liveness_factory()) - .service(Self::get_readiness_factory()) - .service(Self::get_about_factory()) - .service(Self::get_logstream_webscope()) - .service(Self::get_user_webscope()) - .service(Self::get_dashboards_webscope()) - .service(Self::get_filters_webscope()) - .service(Self::get_llm_webscope()) - .service(Self::get_oauth_webscope(oidc_client)) - .service(Self::get_user_role_webscope()), - ) - .service(Self::get_ingest_otel_factory()) - .service(Self::get_generated()); - } - - // get the dashboards web scope - pub fn get_dashboards_webscope() -> Scope { - web::scope("/dashboards") - .service( - web::resource("").route( - web::post() - .to(dashboards::post) - .authorize(Action::CreateDashboard), - ), - ) - .service( - web::scope("/dashboard").service( - web::resource("/{dashboard_id}") - .route( - web::get() - .to(dashboards::get) - .authorize(Action::GetDashboard), - ) - .route( - web::delete() - .to(dashboards::delete) - .authorize(Action::DeleteDashboard), - ) - .route( - web::put() - .to(dashboards::update) - .authorize(Action::CreateDashboard), - ), - ), - ) - .service( - web::scope("/{user_id}").service( - web::resource("").route( - web::get() - .to(dashboards::list) - .authorize(Action::ListDashboard), - ), - ), - ) - } - - // get the filters web scope - pub fn get_filters_webscope() -> Scope { - web::scope("/filters") - .service( - web::resource("").route( - web::post() - .to(filters::post) - .authorize(Action::CreateFilter), - ), - ) - .service( - web::scope("/filter").service( - web::resource("/{filter_id}") - .route(web::get().to(filters::get).authorize(Action::GetFilter)) - .route( - web::delete() - .to(filters::delete) - .authorize(Action::DeleteFilter), - ) - .route( - web::put() - .to(filters::update) - .authorize(Action::CreateFilter), - ), - ), - ) - .service(web::scope("/{user_id}").service( - web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)), - )) - } - - // get the query factory - pub fn get_query_factory() -> Resource { - web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) - } - - pub fn get_cache_webscope() -> Scope { - web::scope("/cache").service( - web::scope("/{user_id}").service( - web::scope("/{stream}").service( - web::resource("") - .route(web::get().to(cache::list).authorize(Action::ListCache)) - .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), - ), - ), - ) - } - - // get the logstream web scope - pub fn get_logstream_webscope() -> Scope { - web::scope("/logstream") - .service( - // GET "/logstream" ==> Get list of all Log Streams on the server - web::resource("") - .route(web::get().to(logstream::list).authorize(Action::ListStream)), - ) - .service( - web::scope("/{logstream}") - .service( - web::resource("") - // PUT "/logstream/{logstream}" ==> Create log stream - .route( - web::put() - .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), - ) - // POST "/logstream/{logstream}" ==> Post logs to given log stream - .route( - web::post() - .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), - ) - // DELETE "/logstream/{logstream}" ==> Delete log stream - .route( - web::delete() - .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - .service( - // GET "/logstream/{logstream}/info" ==> Get info for given log stream - web::resource("/info").route( - web::get() - .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStream), - ), - ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ) - .service( - web::resource("/retention") - // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), - ) - // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), - ), - ) - .service( - web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) - // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_cache_enabled) - .authorize_for_stream(Action::GetCacheEnabled), - ), - ), - ) - } - - // get the factory for the ingest route - pub fn get_ingest_factory() -> Resource { - web::resource("/ingest") - .route( - web::post() - .to(ingest::ingest) - .authorize_for_stream(Action::Ingest), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) - } - - // /v1/logs endpoint to be used for OTEL log ingestion only - pub fn get_ingest_otel_factory() -> Resource { - web::resource("/v1/logs") - .route( - web::post() - .to(ingest::ingest_otel_logs) - .authorize_for_stream(Action::Ingest), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) - } - - // get the oauth webscope - pub fn get_oauth_webscope(oidc_client: Option) -> Scope { - let oauth = web::scope("/o") - .service(resource("/login").route(web::get().to(oidc::login))) - .service(resource("/logout").route(web::get().to(oidc::logout))) - .service(resource("/code").route(web::get().to(oidc::reply_login))); - - if let Some(client) = oidc_client { - oauth.app_data(web::Data::from(client)) - } else { - oauth - } - } - - // get the role webscope - pub fn get_user_role_webscope() -> Scope { - web::scope("/role") - // GET Role List - .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) - .service( - // PUT and GET Default Role - resource("/default") - .route(web::put().to(role::put_default).authorize(Action::PutRole)) - .route(web::get().to(role::get_default).authorize(Action::GetRole)), - ) - .service( - // PUT, GET, DELETE Roles - resource("/{name}") - .route(web::put().to(role::put).authorize(Action::PutRole)) - .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) - .route(web::get().to(role::get).authorize(Action::GetRole)), - ) - } - - // get the user webscope - pub fn get_user_webscope() -> Scope { - web::scope("/user") - .service( - web::resource("") - // GET /user => List all users - .route( - web::get() - .to(http::rbac::list_users) - .authorize(Action::ListUser), - ), - ) - .service( - web::resource("/{username}") - // PUT /user/{username} => Create a new user - .route( - web::post() - .to(http::rbac::post_user) - .authorize(Action::PutUser), - ) - // DELETE /user/{username} => Delete a user - .route( - web::delete() - .to(http::rbac::delete_user) - .authorize(Action::DeleteUser), - ) - .wrap(DisAllowRootUser), - ) - .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user - .route( - web::put() - .to(http::rbac::put_role) - .authorize(Action::PutUserRoles) - .wrap(DisAllowRootUser), - ) - .route( - web::get() - .to(http::rbac::get_role) - .authorize_for_user(Action::GetUserRoles), - ), - ) - .service( - web::resource("/{username}/generate-new-password") - // POST /user/{username}/generate-new-password => reset password for this user - .route( - web::post() - .to(http::rbac::post_gen_password) - .authorize(Action::PutUser) - .wrap(DisAllowRootUser), - ), - ) - } - - // get the llm webscope - pub fn get_llm_webscope() -> Scope { - web::scope("/llm").service( - web::resource("").route( - web::post() - .to(llm::make_llm_request) - .authorize(Action::QueryLLM), - ), - ) - } - - // get the live check - // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command - // HEAD "/liveness" - pub fn get_liveness_factory() -> Resource { - web::resource("/liveness") - .route(web::get().to(health_check::liveness)) - .route(web::head().to(health_check::liveness)) - } - - // get the readiness check - // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes - // HEAD "/readiness" - pub fn get_readiness_factory() -> Resource { - web::resource("/readiness") - .route(web::get().to(health_check::readiness)) - .route(web::head().to(health_check::readiness)) - } - - // get the about factory - pub fn get_about_factory() -> Resource { - web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout)) - } - - // GET "/" ==> Serve the static frontend directory - pub fn get_generated() -> ResourceFiles { - ResourceFiles::new("/", generate()).resolve_not_found_to_root() - } - - async fn initialize(&self) -> anyhow::Result<()> { - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - - storage::retention::load_retention_from_global(); - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync(); - - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - tokio::spawn(handlers::livetail::server()); - tokio::spawn(handlers::airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); - } - }; - } - } -} + use crate::analytics; + use crate::banner; + use crate::handlers; + use crate::handlers::http::about; + use crate::handlers::http::base_path; + use crate::handlers::http::cache; + use crate::handlers::http::health_check; + use crate::handlers::http::query; + use crate::handlers::http::users::dashboards; + use crate::handlers::http::users::filters; + use crate::handlers::http::API_BASE_PATH; + use crate::handlers::http::API_VERSION; + use crate::localcache::LocalCacheManager; + use crate::metrics; + use crate::migration; + use crate::rbac; + use crate::storage; + use crate::sync; + use crate::users::dashboards::DASHBOARDS; + use crate::users::filters::FILTERS; + use std::sync::Arc; + + use actix_web::web::resource; + use actix_web::Resource; + use actix_web::Scope; + use actix_web::{web, App, HttpServer}; + use actix_web_prometheus::PrometheusMetrics; + use actix_web_static_files::ResourceFiles; + use async_trait::async_trait; + + use crate::{ + handlers::http::{ + self, cross_origin_config, ingest, llm, logstream, + middleware::{DisAllowRootUser, RouteExt}, + oidc, role, MAX_EVENT_PAYLOAD_SIZE, + }, + option::CONFIG, + rbac::role::Action, + }; + + // use super::generate; + use super::generate; + use super::ssl_acceptor::get_ssl_acceptor; + use super::OpenIdClient; + use super::ParseableServer; + + #[derive(Default)] + pub struct Server; + + #[async_trait(?Send)] + impl ParseableServer for Server { + async fn start( + &self, + prometheus: PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + None => None, + }; + + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + )?; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + self.validate()?; + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + rbac::map::init(&metadata); + metadata.set_global(); + self.initialize().await?; + Ok(()) + } + + fn validate(&self) -> anyhow::Result<()> { + Ok(()) + } + } + + impl Server { + fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { + // there might be a bug in the configure routes method + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Self::get_query_factory()) + .service(Self::get_cache_webscope()) + .service(Self::get_ingest_factory()) + .service(Self::get_liveness_factory()) + .service(Self::get_readiness_factory()) + .service(Self::get_about_factory()) + .service(Self::get_logstream_webscope()) + .service(Self::get_user_webscope()) + .service(Self::get_dashboards_webscope()) + .service(Self::get_filters_webscope()) + .service(Self::get_llm_webscope()) + .service(Self::get_oauth_webscope(oidc_client)) + .service(Self::get_user_role_webscope()), + ) + .service(Self::get_ingest_otel_factory()) + .service(Self::get_generated()); + } + + // get the dashboards web scope + pub fn get_dashboards_webscope() -> Scope { + web::scope("/dashboards") + .service( + web::resource("").route( + web::post() + .to(dashboards::post) + .authorize(Action::CreateDashboard), + ), + ) + .service( + web::scope("/dashboard").service( + web::resource("/{dashboard_id}") + .route( + web::get() + .to(dashboards::get) + .authorize(Action::GetDashboard), + ) + .route( + web::delete() + .to(dashboards::delete) + .authorize(Action::DeleteDashboard), + ) + .route( + web::put() + .to(dashboards::update) + .authorize(Action::CreateDashboard), + ), + ), + ) + .service( + web::scope("/{user_id}").service( + web::resource("").route( + web::get() + .to(dashboards::list) + .authorize(Action::ListDashboard), + ), + ), + ) + } + + // get the filters web scope + pub fn get_filters_webscope() -> Scope { + web::scope("/filters") + .service( + web::resource("").route( + web::post() + .to(filters::post) + .authorize(Action::CreateFilter), + ), + ) + .service( + web::scope("/filter").service( + web::resource("/{filter_id}") + .route(web::get().to(filters::get).authorize(Action::GetFilter)) + .route( + web::delete() + .to(filters::delete) + .authorize(Action::DeleteFilter), + ) + .route( + web::put() + .to(filters::update) + .authorize(Action::CreateFilter), + ), + ), + ) + .service(web::scope("/{user_id}").service( + web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)), + )) + } + + // get the query factory + pub fn get_query_factory() -> Resource { + web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) + } + + pub fn get_cache_webscope() -> Scope { + web::scope("/cache").service( + web::scope("/{user_id}").service( + web::scope("/{stream}").service( + web::resource("") + .route(web::get().to(cache::list).authorize(Action::ListCache)) + .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), + ), + ), + ) + } + + // get the logstream web scope + pub fn get_logstream_webscope() -> Scope { + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), + ) + .service( + web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) + // POST "/logstream/{logstream}" ==> Post logs to given log stream + .route( + web::post() + .to(ingest::post_event) + .authorize_for_stream(Action::Ingest), + ) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + // GET "/logstream/{logstream}/info" ==> Get info for given log stream + web::resource("/info").route( + web::get() + .to(logstream::get_stream_info) + .authorize_for_stream(Action::GetStream), + ), + ) + .service( + web::resource("/alert") + // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream + .route( + web::put() + .to(logstream::put_alert) + .authorize_for_stream(Action::PutAlert), + ) + // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream + .route( + web::get() + .to(logstream::get_alert) + .authorize_for_stream(Action::GetAlert), + ), + ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ) + .service( + web::resource("/retention") + // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_retention) + .authorize_for_stream(Action::PutRetention), + ) + // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_retention) + .authorize_for_stream(Action::GetRetention), + ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), + ), + ), + ) + } + + // get the factory for the ingest route + pub fn get_ingest_factory() -> Resource { + web::resource("/ingest") + .route( + web::post() + .to(ingest::ingest) + .authorize_for_stream(Action::Ingest), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + } + + // /v1/logs endpoint to be used for OTEL log ingestion only + pub fn get_ingest_otel_factory() -> Resource { + web::resource("/v1/logs") + .route( + web::post() + .to(ingest::ingest_otel_logs) + .authorize_for_stream(Action::Ingest), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + } + + // get the oauth webscope + pub fn get_oauth_webscope(oidc_client: Option) -> Scope { + let oauth = web::scope("/o") + .service(resource("/login").route(web::get().to(oidc::login))) + .service(resource("/logout").route(web::get().to(oidc::logout))) + .service(resource("/code").route(web::get().to(oidc::reply_login))); + + if let Some(client) = oidc_client { + oauth.app_data(web::Data::from(client)) + } else { + oauth + } + } + + // get the role webscope + pub fn get_user_role_webscope() -> Scope { + web::scope("/role") + // GET Role List + .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service( + // PUT and GET Default Role + resource("/default") + .route(web::put().to(role::put_default).authorize(Action::PutRole)) + .route(web::get().to(role::get_default).authorize(Action::GetRole)), + ) + .service( + // PUT, GET, DELETE Roles + resource("/{name}") + .route(web::put().to(role::put).authorize(Action::PutRole)) + .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route(web::get().to(role::get).authorize(Action::GetRole)), + ) + } + + // get the user webscope + pub fn get_user_webscope() -> Scope { + web::scope("/user") + .service( + web::resource("") + // GET /user => List all users + .route( + web::get() + .to(http::rbac::list_users) + .authorize(Action::ListUser), + ), + ) + .service( + web::resource("/{username}") + // PUT /user/{username} => Create a new user + .route( + web::post() + .to(http::rbac::post_user) + .authorize(Action::PutUser), + ) + // DELETE /user/{username} => Delete a user + .route( + web::delete() + .to(http::rbac::delete_user) + .authorize(Action::DeleteUser), + ) + .wrap(DisAllowRootUser), + ) + .service( + web::resource("/{username}/role") + // PUT /user/{username}/roles => Put roles for user + .route( + web::put() + .to(http::rbac::put_role) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ) + .route( + web::get() + .to(http::rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/generate-new-password") + // POST /user/{username}/generate-new-password => reset password for this user + .route( + web::post() + .to(http::rbac::post_gen_password) + .authorize(Action::PutUser) + .wrap(DisAllowRootUser), + ), + ) + } + + // get the llm webscope + pub fn get_llm_webscope() -> Scope { + web::scope("/llm").service( + web::resource("").route( + web::post() + .to(llm::make_llm_request) + .authorize(Action::QueryLLM), + ), + ) + } + + // get the live check + // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command + // HEAD "/liveness" + pub fn get_liveness_factory() -> Resource { + web::resource("/liveness") + .route(web::get().to(health_check::liveness)) + .route(web::head().to(health_check::liveness)) + } + + // get the readiness check + // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes + // HEAD "/readiness" + pub fn get_readiness_factory() -> Resource { + web::resource("/readiness") + .route(web::get().to(health_check::readiness)) + .route(web::head().to(health_check::readiness)) + } + + // get the about factory + pub fn get_about_factory() -> Resource { + web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout)) + } + + // GET "/" ==> Serve the static frontend directory + pub fn get_generated() -> ResourceFiles { + ResourceFiles::new("/", generate()).resolve_not_found_to_root() + } + + async fn initialize(&self) -> anyhow::Result<()> { + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + + storage::retention::load_retention_from_global(); + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); + + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + }; + } + } + } + \ No newline at end of file diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 56c0efa0c..7e0d1827a 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -16,93 +16,76 @@ * */ -use std::hash::Hash; -use std::num::NonZeroU32; -use std::sync::Mutex; -use std::thread; -use std::time::Duration; - -use clokwerk::AsyncScheduler; -use clokwerk::Job; -use clokwerk::TimeUnits; -use derive_more::Display; -use once_cell::sync::Lazy; - -use crate::metadata::STREAM_INFO; - -type SchedulerHandle = thread::JoinHandle<()>; - -static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); - -fn async_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_current_thread() - .thread_name("retention-task-thread") - .enable_all() - .build() - .unwrap() -} - -pub fn load_retention_from_global() { - log::info!("loading retention for all streams"); - init_scheduler(); -} - -pub fn init_scheduler() { - log::info!("Setting up schedular"); - let mut scheduler = AsyncScheduler::new(); - let func = move || async { - //get retention every day at 12 am - for stream in STREAM_INFO.list_streams() { - let retention = STREAM_INFO.get_retention(&stream); - - match retention { - Ok(config) => { - if config.is_none() { - continue; - } - for Task { action, days, .. } in config.unwrap().tasks.into_iter() { - match action { - Action::Delete => { - let stream = stream.to_string(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - // Run the asynchronous delete action - action::delete(stream.clone(), u32::from(days)).await; - }); - }); - } - }; - } - } - Err(err) => { - log::warn!("failed to load retention config for {stream} due to {err:?}") - } - }; - } - }; - - // Execute once on startup - thread::spawn(move || { - let rt = async_runtime(); - rt.block_on(func()); - }); - - scheduler.every(1.day()).at("00:00").run(func); - - let scheduler_handler = thread::spawn(|| { - let rt = async_runtime(); - rt.block_on(async move { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - scheduler.run_pending().await; - } - }); - }); - - *SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); - log::info!("Scheduler is initialized") -} + use std::hash::Hash; + use std::num::NonZeroU32; + use std::sync::Mutex; + use std::time::Duration; + + use clokwerk::AsyncScheduler; + use clokwerk::Job; + use clokwerk::TimeUnits; + use derive_more::Display; + use once_cell::sync::Lazy; + use tokio::task::JoinHandle; + + use crate::metadata::STREAM_INFO; + + type SchedulerHandle = JoinHandle<()>; + + static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); + + pub fn load_retention_from_global() { + log::info!("loading retention for all streams"); + init_scheduler(); + } + + pub fn init_scheduler() { + log::info!("Setting up scheduler"); + let mut scheduler = AsyncScheduler::new(); + let func = move || async { + //get retention every day at 12 am + for stream in STREAM_INFO.list_streams() { + let retention = STREAM_INFO.get_retention(&stream); + + match retention { + Ok(config) => { + if let Some(config) = config { + for Task { action, days, .. } in config.tasks.into_iter() { + match action { + Action::Delete => { + let stream = stream.to_string(); + tokio::spawn(async move { + action::delete(stream.clone(), u32::from(days)).await; + }); + } + }; + } + } + } + Err(err) => { + log::warn!("failed to load retention config for {stream} due to {err:?}") + } + }; + } + }; + + // Execute once on startup + tokio::spawn(async move { + func().await; + }); + + scheduler.every(1.day()).at("00:00").run(func); + + let scheduler_handler = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + scheduler.run_pending().await; + } + }); + + *SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); + log::info!("Scheduler is initialized") + } #[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] #[serde(try_from = "Vec")] diff --git a/server/src/sync.rs b/server/src/sync.rs index b44dcde13..6a19b333d 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,97 +16,52 @@ * */ -use clokwerk::{AsyncScheduler, Job, Scheduler, TimeUnits}; -use thread_priority::{ThreadBuilder, ThreadPriority}; -use tokio::sync::oneshot; -use tokio::sync::oneshot::error::TryRecvError; - -use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; - -use crate::option::CONFIG; -use crate::{storage, STORAGE_UPLOAD_INTERVAL}; - -pub fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let handle = thread::spawn(move || { - let res = catch_unwind(move || { - let rt = actix_web::rt::System::new(); - rt.block_on(async { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(STORAGE_UPLOAD_INTERVAL.seconds()) - // Extra time interval is added so that this schedular does not race with local sync. - .plus(5u32.seconds()) - .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().sync().await { - log::warn!("failed to sync local data with object store. {:?}", e); - } - }); - - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - scheduler.run_pending().await; - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; - } - } - } - }) - }); - - if res.is_err() { - outbox_tx.send(()).unwrap(); - } - }); - - (handle, outbox_rx, inbox_tx) -} - -pub fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - - let handle = ThreadBuilder::default() - .name("local-sync") - .priority(ThreadPriority::Max) - .spawn(move |priority_result| { - if priority_result.is_err() { - log::warn!("Max priority cannot be set for sync thread. Make sure that user/program is allowed to set thread priority.") - } - let res = catch_unwind(move || { - let mut scheduler = Scheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(move || crate::event::STREAM_WRITERS.unset_all()); - - loop { - thread::sleep(Duration::from_millis(50)); - scheduler.run_pending(); - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; - } - } - } - }); - - if res.is_err() { - outbox_tx.send(()).unwrap(); - } - }) - .unwrap(); - - (handle, outbox_rx, inbox_tx) -} + use tokio::task; + use tokio::time::{interval, Duration}; + use tokio::sync::oneshot; + use tokio::select; + + use crate::option::CONFIG; + use crate::{storage, STORAGE_UPLOAD_INTERVAL}; + + pub async fn object_store_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); + + let handle = task::spawn(async move { + let mut interval = interval(Duration::from_secs((STORAGE_UPLOAD_INTERVAL + 5).into())); + + loop { + select! { + _ = interval.tick() => { + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("failed to sync local data with object store. {:?}", e); + } + } + _ = &mut inbox_rx => break, + } + } + }); + + (handle, outbox_rx, inbox_tx) + } + + pub async fn run_local_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); + + let handle = task::spawn(async move { + let mut interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); + + loop { + select! { + _ = interval.tick() => { + crate::event::STREAM_WRITERS.unset_all(); + } + _ = &mut inbox_rx => break, + } + } + }); + + (handle, outbox_rx, inbox_tx) + } From f15b1378fd6296dfdc51c8e89a5351ed85d1bf31 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 29 Jul 2024 22:41:33 +0530 Subject: [PATCH 02/12] corrected intendation --- server/src/handlers/http/modal/server.rs | 1085 +++++++++++----------- server/src/sync.rs | 86 +- 2 files changed, 585 insertions(+), 586 deletions(-) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 08202013d..359692b23 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -16,546 +16,545 @@ * */ - use crate::analytics; - use crate::banner; - use crate::handlers; - use crate::handlers::http::about; - use crate::handlers::http::base_path; - use crate::handlers::http::cache; - use crate::handlers::http::health_check; - use crate::handlers::http::query; - use crate::handlers::http::users::dashboards; - use crate::handlers::http::users::filters; - use crate::handlers::http::API_BASE_PATH; - use crate::handlers::http::API_VERSION; - use crate::localcache::LocalCacheManager; - use crate::metrics; - use crate::migration; - use crate::rbac; - use crate::storage; - use crate::sync; - use crate::users::dashboards::DASHBOARDS; - use crate::users::filters::FILTERS; - use std::sync::Arc; - - use actix_web::web::resource; - use actix_web::Resource; - use actix_web::Scope; - use actix_web::{web, App, HttpServer}; - use actix_web_prometheus::PrometheusMetrics; - use actix_web_static_files::ResourceFiles; - use async_trait::async_trait; - - use crate::{ - handlers::http::{ - self, cross_origin_config, ingest, llm, logstream, - middleware::{DisAllowRootUser, RouteExt}, - oidc, role, MAX_EVENT_PAYLOAD_SIZE, - }, - option::CONFIG, - rbac::role::Action, - }; - - // use super::generate; - use super::generate; - use super::ssl_acceptor::get_ssl_acceptor; - use super::OpenIdClient; - use super::ParseableServer; - - #[derive(Default)] - pub struct Server; - - #[async_trait(?Send)] - impl ParseableServer for Server { - async fn start( - &self, - prometheus: PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - None => None, - }; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - )?; - - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - .await?; - } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - self.initialize().await?; - Ok(()) - } - - fn validate(&self) -> anyhow::Result<()> { - Ok(()) - } - } - - impl Server { - fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { - // there might be a bug in the configure routes method - config - .service( - web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body - .service(Self::get_query_factory()) - .service(Self::get_cache_webscope()) - .service(Self::get_ingest_factory()) - .service(Self::get_liveness_factory()) - .service(Self::get_readiness_factory()) - .service(Self::get_about_factory()) - .service(Self::get_logstream_webscope()) - .service(Self::get_user_webscope()) - .service(Self::get_dashboards_webscope()) - .service(Self::get_filters_webscope()) - .service(Self::get_llm_webscope()) - .service(Self::get_oauth_webscope(oidc_client)) - .service(Self::get_user_role_webscope()), - ) - .service(Self::get_ingest_otel_factory()) - .service(Self::get_generated()); - } - - // get the dashboards web scope - pub fn get_dashboards_webscope() -> Scope { - web::scope("/dashboards") - .service( - web::resource("").route( - web::post() - .to(dashboards::post) - .authorize(Action::CreateDashboard), - ), - ) - .service( - web::scope("/dashboard").service( - web::resource("/{dashboard_id}") - .route( - web::get() - .to(dashboards::get) - .authorize(Action::GetDashboard), - ) - .route( - web::delete() - .to(dashboards::delete) - .authorize(Action::DeleteDashboard), - ) - .route( - web::put() - .to(dashboards::update) - .authorize(Action::CreateDashboard), - ), - ), - ) - .service( - web::scope("/{user_id}").service( - web::resource("").route( - web::get() - .to(dashboards::list) - .authorize(Action::ListDashboard), - ), - ), - ) - } - - // get the filters web scope - pub fn get_filters_webscope() -> Scope { - web::scope("/filters") - .service( - web::resource("").route( - web::post() - .to(filters::post) - .authorize(Action::CreateFilter), - ), - ) - .service( - web::scope("/filter").service( - web::resource("/{filter_id}") - .route(web::get().to(filters::get).authorize(Action::GetFilter)) - .route( - web::delete() - .to(filters::delete) - .authorize(Action::DeleteFilter), - ) - .route( - web::put() - .to(filters::update) - .authorize(Action::CreateFilter), - ), - ), - ) - .service(web::scope("/{user_id}").service( - web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)), - )) - } - - // get the query factory - pub fn get_query_factory() -> Resource { - web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) - } - - pub fn get_cache_webscope() -> Scope { - web::scope("/cache").service( - web::scope("/{user_id}").service( - web::scope("/{stream}").service( - web::resource("") - .route(web::get().to(cache::list).authorize(Action::ListCache)) - .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), - ), - ), - ) - } - - // get the logstream web scope - pub fn get_logstream_webscope() -> Scope { - web::scope("/logstream") - .service( - // GET "/logstream" ==> Get list of all Log Streams on the server - web::resource("") - .route(web::get().to(logstream::list).authorize(Action::ListStream)), - ) - .service( - web::scope("/{logstream}") - .service( - web::resource("") - // PUT "/logstream/{logstream}" ==> Create log stream - .route( - web::put() - .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), - ) - // POST "/logstream/{logstream}" ==> Post logs to given log stream - .route( - web::post() - .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), - ) - // DELETE "/logstream/{logstream}" ==> Delete log stream - .route( - web::delete() - .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - .service( - // GET "/logstream/{logstream}/info" ==> Get info for given log stream - web::resource("/info").route( - web::get() - .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStream), - ), - ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route( - web::get() - .to(logstream::schema) - .authorize_for_stream(Action::GetSchema), - ), - ) - .service( - // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource("/stats").route( - web::get() - .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), - ), - ) - .service( - web::resource("/retention") - // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), - ) - // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), - ), - ) - .service( - web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) - // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_cache_enabled) - .authorize_for_stream(Action::GetCacheEnabled), - ), - ), - ) - } - - // get the factory for the ingest route - pub fn get_ingest_factory() -> Resource { - web::resource("/ingest") - .route( - web::post() - .to(ingest::ingest) - .authorize_for_stream(Action::Ingest), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) - } - - // /v1/logs endpoint to be used for OTEL log ingestion only - pub fn get_ingest_otel_factory() -> Resource { - web::resource("/v1/logs") - .route( - web::post() - .to(ingest::ingest_otel_logs) - .authorize_for_stream(Action::Ingest), - ) - .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) - } - - // get the oauth webscope - pub fn get_oauth_webscope(oidc_client: Option) -> Scope { - let oauth = web::scope("/o") - .service(resource("/login").route(web::get().to(oidc::login))) - .service(resource("/logout").route(web::get().to(oidc::logout))) - .service(resource("/code").route(web::get().to(oidc::reply_login))); - - if let Some(client) = oidc_client { - oauth.app_data(web::Data::from(client)) - } else { - oauth - } - } - - // get the role webscope - pub fn get_user_role_webscope() -> Scope { - web::scope("/role") - // GET Role List - .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) - .service( - // PUT and GET Default Role - resource("/default") - .route(web::put().to(role::put_default).authorize(Action::PutRole)) - .route(web::get().to(role::get_default).authorize(Action::GetRole)), - ) - .service( - // PUT, GET, DELETE Roles - resource("/{name}") - .route(web::put().to(role::put).authorize(Action::PutRole)) - .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) - .route(web::get().to(role::get).authorize(Action::GetRole)), - ) - } - - // get the user webscope - pub fn get_user_webscope() -> Scope { - web::scope("/user") - .service( - web::resource("") - // GET /user => List all users - .route( - web::get() - .to(http::rbac::list_users) - .authorize(Action::ListUser), - ), - ) - .service( - web::resource("/{username}") - // PUT /user/{username} => Create a new user - .route( - web::post() - .to(http::rbac::post_user) - .authorize(Action::PutUser), - ) - // DELETE /user/{username} => Delete a user - .route( - web::delete() - .to(http::rbac::delete_user) - .authorize(Action::DeleteUser), - ) - .wrap(DisAllowRootUser), - ) - .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user - .route( - web::put() - .to(http::rbac::put_role) - .authorize(Action::PutUserRoles) - .wrap(DisAllowRootUser), - ) - .route( - web::get() - .to(http::rbac::get_role) - .authorize_for_user(Action::GetUserRoles), - ), - ) - .service( - web::resource("/{username}/generate-new-password") - // POST /user/{username}/generate-new-password => reset password for this user - .route( - web::post() - .to(http::rbac::post_gen_password) - .authorize(Action::PutUser) - .wrap(DisAllowRootUser), - ), - ) - } - - // get the llm webscope - pub fn get_llm_webscope() -> Scope { - web::scope("/llm").service( - web::resource("").route( - web::post() - .to(llm::make_llm_request) - .authorize(Action::QueryLLM), - ), - ) - } - - // get the live check - // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command - // HEAD "/liveness" - pub fn get_liveness_factory() -> Resource { - web::resource("/liveness") - .route(web::get().to(health_check::liveness)) - .route(web::head().to(health_check::liveness)) - } - - // get the readiness check - // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes - // HEAD "/readiness" - pub fn get_readiness_factory() -> Resource { - web::resource("/readiness") - .route(web::get().to(health_check::readiness)) - .route(web::head().to(health_check::readiness)) - } - - // get the about factory - pub fn get_about_factory() -> Resource { - web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout)) - } - - // GET "/" ==> Serve the static frontend directory - pub fn get_generated() -> ResourceFiles { - ResourceFiles::new("/", generate()).resolve_not_found_to_root() - } - - async fn initialize(&self) -> anyhow::Result<()> { - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - - storage::retention::load_retention_from_global(); - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - tokio::spawn(handlers::livetail::server()); - tokio::spawn(handlers::airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - }; - } - } - } - \ No newline at end of file +use crate::analytics; +use crate::banner; +use crate::handlers; +use crate::handlers::http::about; +use crate::handlers::http::base_path; +use crate::handlers::http::cache; +use crate::handlers::http::health_check; +use crate::handlers::http::query; +use crate::handlers::http::users::dashboards; +use crate::handlers::http::users::filters; +use crate::handlers::http::API_BASE_PATH; +use crate::handlers::http::API_VERSION; +use crate::localcache::LocalCacheManager; +use crate::metrics; +use crate::migration; +use crate::rbac; +use crate::storage; +use crate::sync; +use crate::users::dashboards::DASHBOARDS; +use crate::users::filters::FILTERS; +use std::sync::Arc; + +use actix_web::web::resource; +use actix_web::Resource; +use actix_web::Scope; +use actix_web::{web, App, HttpServer}; +use actix_web_prometheus::PrometheusMetrics; +use actix_web_static_files::ResourceFiles; +use async_trait::async_trait; + +use crate::{ + handlers::http::{ + self, cross_origin_config, ingest, llm, logstream, + middleware::{DisAllowRootUser, RouteExt}, + oidc, role, MAX_EVENT_PAYLOAD_SIZE, + }, + option::CONFIG, + rbac::role::Action, +}; + +// use super::generate; +use super::generate; +use super::ssl_acceptor::get_ssl_acceptor; +use super::OpenIdClient; +use super::ParseableServer; + +#[derive(Default)] +pub struct Server; + +#[async_trait(?Send)] +impl ParseableServer for Server { + async fn start( + &self, + prometheus: PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + None => None, + }; + + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + )?; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + self.validate()?; + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + rbac::map::init(&metadata); + metadata.set_global(); + self.initialize().await?; + Ok(()) + } + + fn validate(&self) -> anyhow::Result<()> { + Ok(()) + } +} + +impl Server { + fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { + // there might be a bug in the configure routes method + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Self::get_query_factory()) + .service(Self::get_cache_webscope()) + .service(Self::get_ingest_factory()) + .service(Self::get_liveness_factory()) + .service(Self::get_readiness_factory()) + .service(Self::get_about_factory()) + .service(Self::get_logstream_webscope()) + .service(Self::get_user_webscope()) + .service(Self::get_dashboards_webscope()) + .service(Self::get_filters_webscope()) + .service(Self::get_llm_webscope()) + .service(Self::get_oauth_webscope(oidc_client)) + .service(Self::get_user_role_webscope()), + ) + .service(Self::get_ingest_otel_factory()) + .service(Self::get_generated()); + } + + // get the dashboards web scope + pub fn get_dashboards_webscope() -> Scope { + web::scope("/dashboards") + .service( + web::resource("").route( + web::post() + .to(dashboards::post) + .authorize(Action::CreateDashboard), + ), + ) + .service( + web::scope("/dashboard").service( + web::resource("/{dashboard_id}") + .route( + web::get() + .to(dashboards::get) + .authorize(Action::GetDashboard), + ) + .route( + web::delete() + .to(dashboards::delete) + .authorize(Action::DeleteDashboard), + ) + .route( + web::put() + .to(dashboards::update) + .authorize(Action::CreateDashboard), + ), + ), + ) + .service( + web::scope("/{user_id}").service( + web::resource("").route( + web::get() + .to(dashboards::list) + .authorize(Action::ListDashboard), + ), + ), + ) + } + + // get the filters web scope + pub fn get_filters_webscope() -> Scope { + web::scope("/filters") + .service( + web::resource("").route( + web::post() + .to(filters::post) + .authorize(Action::CreateFilter), + ), + ) + .service( + web::scope("/filter").service( + web::resource("/{filter_id}") + .route(web::get().to(filters::get).authorize(Action::GetFilter)) + .route( + web::delete() + .to(filters::delete) + .authorize(Action::DeleteFilter), + ) + .route( + web::put() + .to(filters::update) + .authorize(Action::CreateFilter), + ), + ), + ) + .service(web::scope("/{user_id}").service( + web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)), + )) + } + + // get the query factory + pub fn get_query_factory() -> Resource { + web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) + } + + pub fn get_cache_webscope() -> Scope { + web::scope("/cache").service( + web::scope("/{user_id}").service( + web::scope("/{stream}").service( + web::resource("") + .route(web::get().to(cache::list).authorize(Action::ListCache)) + .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), + ), + ), + ) + } + + // get the logstream web scope + pub fn get_logstream_webscope() -> Scope { + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), + ) + .service( + web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) + // POST "/logstream/{logstream}" ==> Post logs to given log stream + .route( + web::post() + .to(ingest::post_event) + .authorize_for_stream(Action::Ingest), + ) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + // GET "/logstream/{logstream}/info" ==> Get info for given log stream + web::resource("/info").route( + web::get() + .to(logstream::get_stream_info) + .authorize_for_stream(Action::GetStream), + ), + ) + .service( + web::resource("/alert") + // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream + .route( + web::put() + .to(logstream::put_alert) + .authorize_for_stream(Action::PutAlert), + ) + // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream + .route( + web::get() + .to(logstream::get_alert) + .authorize_for_stream(Action::GetAlert), + ), + ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ) + .service( + web::resource("/retention") + // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_retention) + .authorize_for_stream(Action::PutRetention), + ) + // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_retention) + .authorize_for_stream(Action::GetRetention), + ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), + ), + ), + ) + } + + // get the factory for the ingest route + pub fn get_ingest_factory() -> Resource { + web::resource("/ingest") + .route( + web::post() + .to(ingest::ingest) + .authorize_for_stream(Action::Ingest), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + } + + // /v1/logs endpoint to be used for OTEL log ingestion only + pub fn get_ingest_otel_factory() -> Resource { + web::resource("/v1/logs") + .route( + web::post() + .to(ingest::ingest_otel_logs) + .authorize_for_stream(Action::Ingest), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + } + + // get the oauth webscope + pub fn get_oauth_webscope(oidc_client: Option) -> Scope { + let oauth = web::scope("/o") + .service(resource("/login").route(web::get().to(oidc::login))) + .service(resource("/logout").route(web::get().to(oidc::logout))) + .service(resource("/code").route(web::get().to(oidc::reply_login))); + + if let Some(client) = oidc_client { + oauth.app_data(web::Data::from(client)) + } else { + oauth + } + } + + // get the role webscope + pub fn get_user_role_webscope() -> Scope { + web::scope("/role") + // GET Role List + .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service( + // PUT and GET Default Role + resource("/default") + .route(web::put().to(role::put_default).authorize(Action::PutRole)) + .route(web::get().to(role::get_default).authorize(Action::GetRole)), + ) + .service( + // PUT, GET, DELETE Roles + resource("/{name}") + .route(web::put().to(role::put).authorize(Action::PutRole)) + .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route(web::get().to(role::get).authorize(Action::GetRole)), + ) + } + + // get the user webscope + pub fn get_user_webscope() -> Scope { + web::scope("/user") + .service( + web::resource("") + // GET /user => List all users + .route( + web::get() + .to(http::rbac::list_users) + .authorize(Action::ListUser), + ), + ) + .service( + web::resource("/{username}") + // PUT /user/{username} => Create a new user + .route( + web::post() + .to(http::rbac::post_user) + .authorize(Action::PutUser), + ) + // DELETE /user/{username} => Delete a user + .route( + web::delete() + .to(http::rbac::delete_user) + .authorize(Action::DeleteUser), + ) + .wrap(DisAllowRootUser), + ) + .service( + web::resource("/{username}/role") + // PUT /user/{username}/roles => Put roles for user + .route( + web::put() + .to(http::rbac::put_role) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ) + .route( + web::get() + .to(http::rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/generate-new-password") + // POST /user/{username}/generate-new-password => reset password for this user + .route( + web::post() + .to(http::rbac::post_gen_password) + .authorize(Action::PutUser) + .wrap(DisAllowRootUser), + ), + ) + } + + // get the llm webscope + pub fn get_llm_webscope() -> Scope { + web::scope("/llm").service( + web::resource("").route( + web::post() + .to(llm::make_llm_request) + .authorize(Action::QueryLLM), + ), + ) + } + + // get the live check + // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command + // HEAD "/liveness" + pub fn get_liveness_factory() -> Resource { + web::resource("/liveness") + .route(web::get().to(health_check::liveness)) + .route(web::head().to(health_check::liveness)) + } + + // get the readiness check + // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes + // HEAD "/readiness" + pub fn get_readiness_factory() -> Resource { + web::resource("/readiness") + .route(web::get().to(health_check::readiness)) + .route(web::head().to(health_check::readiness)) + } + + // get the about factory + pub fn get_about_factory() -> Resource { + web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout)) + } + + // GET "/" ==> Serve the static frontend directory + pub fn get_generated() -> ResourceFiles { + ResourceFiles::new("/", generate()).resolve_not_found_to_root() + } + + async fn initialize(&self) -> anyhow::Result<()> { + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + + storage::retention::load_retention_from_global(); + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); + + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + }; + } + } +} diff --git a/server/src/sync.rs b/server/src/sync.rs index 6a19b333d..41e388ec0 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,52 +16,52 @@ * */ - use tokio::task; - use tokio::time::{interval, Duration}; - use tokio::sync::oneshot; - use tokio::select; +use tokio::task; +use tokio::time::{interval, Duration}; +use tokio::sync::oneshot; +use tokio::select; - use crate::option::CONFIG; - use crate::{storage, STORAGE_UPLOAD_INTERVAL}; +use crate::option::CONFIG; +use crate::{storage, STORAGE_UPLOAD_INTERVAL}; - pub async fn object_store_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); +pub async fn object_store_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); - let handle = task::spawn(async move { - let mut interval = interval(Duration::from_secs((STORAGE_UPLOAD_INTERVAL + 5).into())); + let handle = task::spawn(async move { + let mut interval = interval(Duration::from_secs((STORAGE_UPLOAD_INTERVAL + 5).into())); - loop { - select! { - _ = interval.tick() => { - if let Err(e) = CONFIG.storage().get_object_store().sync().await { - log::warn!("failed to sync local data with object store. {:?}", e); - } - } - _ = &mut inbox_rx => break, - } - } - }); + loop { + select! { + _ = interval.tick() => { + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("failed to sync local data with object store. {:?}", e); + } + } + _ = &mut inbox_rx => break, + } + } + }); - (handle, outbox_rx, inbox_tx) - } + (handle, outbox_rx, inbox_tx) +} - pub async fn run_local_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); - - let handle = task::spawn(async move { - let mut interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); - - loop { - select! { - _ = interval.tick() => { - crate::event::STREAM_WRITERS.unset_all(); - } - _ = &mut inbox_rx => break, - } - } - }); - - (handle, outbox_rx, inbox_tx) - } +pub async fn run_local_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); + + let handle = task::spawn(async move { + let mut interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); + + loop { + select! { + _ = interval.tick() => { + crate::event::STREAM_WRITERS.unset_all(); + } + _ = &mut inbox_rx => break, + } + } + }); + + (handle, outbox_rx, inbox_tx) +} From dba4bbc54e44d6b7c4697d821c25e1b3bfd2ea1f Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 29 Jul 2024 23:06:18 +0530 Subject: [PATCH 03/12] fixed intendation --- .../src/handlers/http/modal/ingest_server.rs | 3 +- .../src/handlers/http/modal/query_server.rs | 420 +++++++++--------- server/src/handlers/http/modal/server.rs | 3 +- server/src/sync.rs | 28 +- 4 files changed, 232 insertions(+), 222 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 91eafac6b..a9deaf82d 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -330,7 +330,8 @@ impl IngestServer { migration::run_migration(&CONFIG).await?; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 9e64fde11..6dc97593c 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,215 +16,215 @@ * */ - use crate::handlers::airplane; - use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; - use crate::handlers::http::middleware::RouteExt; - use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; - - use crate::rbac::role::Action; - use crate::sync; - use crate::users::dashboards::DASHBOARDS; - use crate::users::filters::FILTERS; - use crate::{analytics, banner, metrics, migration, rbac, storage}; - use actix_web::web; - use actix_web::web::ServiceConfig; - use actix_web::{App, HttpServer}; - use async_trait::async_trait; - use std::sync::Arc; - - use crate::option::CONFIG; - - use super::server::Server; - use super::ssl_acceptor::get_ssl_acceptor; - use super::{OpenIdClient, ParseableServer}; - - #[derive(Default, Debug)] - pub struct QueryServer; - - #[async_trait(?Send)] - impl ParseableServer for QueryServer { - async fn start( - &self, - prometheus: actix_web_prometheus::PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - - None => None, - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - )?; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); - if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - .await?; - } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - // initialize the rbac map - rbac::map::init(&metadata); - // keep metadata info in mem - metadata.set_global(); - self.initialize().await - } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::anyhow!( +use crate::handlers::airplane; +use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::middleware::RouteExt; +use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; + +use crate::rbac::role::Action; +use crate::sync; +use crate::users::dashboards::DASHBOARDS; +use crate::users::filters::FILTERS; +use crate::{analytics, banner, metrics, migration, rbac, storage}; +use actix_web::web; +use actix_web::web::ServiceConfig; +use actix_web::{App, HttpServer}; +use async_trait::async_trait; +use std::sync::Arc; + +use crate::option::CONFIG; + +use super::server::Server; +use super::ssl_acceptor::get_ssl_acceptor; +use super::{OpenIdClient, ParseableServer}; + +#[derive(Default, Debug)] +pub struct QueryServer; + +#[async_trait(?Send)] +impl ParseableServer for QueryServer { + async fn start( + &self, + prometheus: actix_web_prometheus::PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + + None => None, + }; + + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + )?; + + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) + } + + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + self.validate()?; + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + // initialize the rbac map + rbac::map::init(&metadata); + // keep metadata info in mem + metadata.set_global(); + self.initialize().await + } + + fn validate(&self) -> anyhow::Result<()> { + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::anyhow!( "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", )); - } - - Ok(()) - } - } - - impl QueryServer { - // configure the api routes - fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { - config - .service( - web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body - .service(Server::get_query_factory()) - .service(Server::get_cache_webscope()) - .service(Server::get_liveness_factory()) - .service(Server::get_readiness_factory()) - .service(Server::get_about_factory()) - .service(Server::get_logstream_webscope()) - .service(Server::get_user_webscope()) - .service(Server::get_dashboards_webscope()) - .service(Server::get_filters_webscope()) - .service(Server::get_llm_webscope()) - .service(Server::get_oauth_webscope(oidc_client)) - .service(Server::get_user_role_webscope()) - .service(Self::get_cluster_web_scope()), - ) - .service(Server::get_generated()); - } - - fn get_cluster_web_scope() -> actix_web::Scope { - web::scope("/cluster") - .service( - // GET "/cluster/info" ==> Get info of the cluster - web::resource("/info").route( - web::get() - .to(cluster::get_cluster_info) - .authorize(Action::ListCluster), - ), - ) - // GET "/cluster/metrics" ==> Get metrics of the cluster - .service( - web::resource("/metrics").route( - web::get() - .to(cluster::get_cluster_metrics) - .authorize(Action::ListClusterMetrics), - ), - ) - // DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster - .service( - web::scope("/{ingestor}").service( - web::resource("").route( - web::delete() - .to(cluster::remove_ingestor) - .authorize(Action::Deleteingestor), - ), - ), - ) - } - - /// initialize the server, run migrations as needed and start the server - async fn initialize(&self) -> anyhow::Result<()> { - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); - - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - if matches!(init_cluster_metrics_schedular(), Ok(())) { - log::info!("Cluster metrics scheduler started successfully"); - } - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining localsync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } - } - \ No newline at end of file + } + + Ok(()) + } +} + +impl QueryServer { + // configure the api routes + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Server::get_query_factory()) + .service(Server::get_cache_webscope()) + .service(Server::get_liveness_factory()) + .service(Server::get_readiness_factory()) + .service(Server::get_about_factory()) + .service(Server::get_logstream_webscope()) + .service(Server::get_user_webscope()) + .service(Server::get_dashboards_webscope()) + .service(Server::get_filters_webscope()) + .service(Server::get_llm_webscope()) + .service(Server::get_oauth_webscope(oidc_client)) + .service(Server::get_user_role_webscope()) + .service(Self::get_cluster_web_scope()), + ) + .service(Server::get_generated()); + } + + fn get_cluster_web_scope() -> actix_web::Scope { + web::scope("/cluster") + .service( + // GET "/cluster/info" ==> Get info of the cluster + web::resource("/info").route( + web::get() + .to(cluster::get_cluster_info) + .authorize(Action::ListCluster), + ), + ) + // GET "/cluster/metrics" ==> Get metrics of the cluster + .service( + web::resource("/metrics").route( + web::get() + .to(cluster::get_cluster_metrics) + .authorize(Action::ListClusterMetrics), + ), + ) + // DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster + .service( + web::scope("/{ingestor}").service( + web::resource("").route( + web::delete() + .to(cluster::remove_ingestor) + .authorize(Action::Deleteingestor), + ), + ), + ) + } + + /// initialize the server, run migrations as needed and start the server + async fn initialize(&self) -> anyhow::Result<()> { + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining localsync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 359692b23..af4bfcb3c 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -514,7 +514,8 @@ impl Server { storage::retention::load_retention_from_global(); - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; diff --git a/server/src/sync.rs b/server/src/sync.rs index 41e388ec0..8c06ec70e 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,21 +16,25 @@ * */ +use tokio::select; +use tokio::sync::oneshot; use tokio::task; use tokio::time::{interval, Duration}; -use tokio::sync::oneshot; -use tokio::select; - + use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; - -pub async fn object_store_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + +pub async fn object_store_sync() -> ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, +) { let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); - + let handle = task::spawn(async move { let mut interval = interval(Duration::from_secs((STORAGE_UPLOAD_INTERVAL + 5).into())); - + loop { select! { _ = interval.tick() => { @@ -42,11 +46,15 @@ pub async fn object_store_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()> } } }); - + (handle, outbox_rx, inbox_tx) } - -pub async fn run_local_sync() -> (task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + +pub async fn run_local_sync() -> ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, +) { let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); From 306dd17ca674ad42d08b2c4aa75f7ce18cf60b88 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 29 Jul 2024 23:09:21 +0530 Subject: [PATCH 04/12] corrected intendation --- server/src/storage/retention.rs | 140 ++++++++++++++++---------------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 7e0d1827a..c65c94c88 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -16,76 +16,76 @@ * */ - use std::hash::Hash; - use std::num::NonZeroU32; - use std::sync::Mutex; - use std::time::Duration; - - use clokwerk::AsyncScheduler; - use clokwerk::Job; - use clokwerk::TimeUnits; - use derive_more::Display; - use once_cell::sync::Lazy; - use tokio::task::JoinHandle; - - use crate::metadata::STREAM_INFO; - - type SchedulerHandle = JoinHandle<()>; - - static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); - - pub fn load_retention_from_global() { - log::info!("loading retention for all streams"); - init_scheduler(); - } - - pub fn init_scheduler() { - log::info!("Setting up scheduler"); - let mut scheduler = AsyncScheduler::new(); - let func = move || async { - //get retention every day at 12 am - for stream in STREAM_INFO.list_streams() { - let retention = STREAM_INFO.get_retention(&stream); - - match retention { - Ok(config) => { - if let Some(config) = config { - for Task { action, days, .. } in config.tasks.into_iter() { - match action { - Action::Delete => { - let stream = stream.to_string(); - tokio::spawn(async move { - action::delete(stream.clone(), u32::from(days)).await; - }); - } - }; - } - } - } - Err(err) => { - log::warn!("failed to load retention config for {stream} due to {err:?}") - } - }; - } - }; - - // Execute once on startup - tokio::spawn(async move { - func().await; - }); - - scheduler.every(1.day()).at("00:00").run(func); - - let scheduler_handler = tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - scheduler.run_pending().await; - } - }); - - *SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); - log::info!("Scheduler is initialized") - } +use std::hash::Hash; +use std::num::NonZeroU32; +use std::sync::Mutex; +use std::time::Duration; + +use clokwerk::AsyncScheduler; +use clokwerk::Job; +use clokwerk::TimeUnits; +use derive_more::Display; +use once_cell::sync::Lazy; +use tokio::task::JoinHandle; + +use crate::metadata::STREAM_INFO; + +type SchedulerHandle = JoinHandle<()>; + +static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); + +pub fn load_retention_from_global() { + log::info!("loading retention for all streams"); + init_scheduler(); +} + +pub fn init_scheduler() { + log::info!("Setting up scheduler"); + let mut scheduler = AsyncScheduler::new(); + let func = move || async { + //get retention every day at 12 am + for stream in STREAM_INFO.list_streams() { + let retention = STREAM_INFO.get_retention(&stream); + + match retention { + Ok(config) => { + if let Some(config) = config { + for Task { action, days, .. } in config.tasks.into_iter() { + match action { + Action::Delete => { + let stream = stream.to_string(); + tokio::spawn(async move { + action::delete(stream.clone(), u32::from(days)).await; + }); + } + }; + } + } + } + Err(err) => { + log::warn!("failed to load retention config for {stream} due to {err:?}") + } + }; + } + }; + + // Execute once on startup + tokio::spawn(async move { + func().await; + }); + + scheduler.every(1.day()).at("00:00").run(func); + + let scheduler_handler = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + scheduler.run_pending().await; + } + }); + + *SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); + log::info!("Scheduler is initialized") +} #[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] #[serde(try_from = "Vec")] From 591487c4d8ae19d318de7fb9892997a3f701c403 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sat, 3 Aug 2024 12:23:27 +0530 Subject: [PATCH 05/12] restructured `run_local_sync()` --- server/src/sync.rs | 60 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/server/src/sync.rs b/server/src/sync.rs index 8c06ec70e..090ac4a76 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -20,6 +20,7 @@ use tokio::select; use tokio::sync::oneshot; use tokio::task; use tokio::time::{interval, Duration}; +use std::panic::{self, AssertUnwindSafe}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -29,7 +30,7 @@ pub async fn object_store_sync() -> ( oneshot::Receiver<()>, oneshot::Sender<()>, ) { - let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { @@ -38,8 +39,22 @@ pub async fn object_store_sync() -> ( loop { select! { _ = interval.tick() => { - if let Err(e) = CONFIG.storage().get_object_store().sync().await { - log::warn!("failed to sync local data with object store. {:?}", e); + match task::spawn(async { + CONFIG.storage().get_object_store().sync().await + }).await { + Ok(Ok(_)) => { + log::info!("Successfully synced local data with object store."); + } + Ok(Err(e)) => { + log::warn!("Failed to sync local data with object store: {:?}", e); + } + Err(e) => { + log::error!("Task panicked during sync: {:?}", e); + if let Err(send_err) = outbox_tx.send(()) { + log::error!("Failed to send outbox message: {:?}", send_err); + } + break; + } } } _ = &mut inbox_rx => break, @@ -55,21 +70,40 @@ pub async fn run_local_sync() -> ( oneshot::Receiver<()>, oneshot::Sender<()>, ) { - let (_outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let mut interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); - - loop { - select! { - _ = interval.tick() => { - crate::event::STREAM_WRITERS.unset_all(); + log::info!("Local sync task started"); + let mut inbox_rx = inbox_rx; + let result = panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); + loop { + tokio::select! { + _ = interval.tick() => { + crate::event::STREAM_WRITERS.unset_all(); + } + _ = &mut inbox_rx => { + log::info!("Received signal to stop local sync"); + return; // Exit the async block when signaled + } } - _ = &mut inbox_rx => break, + } + })); + + match result { + Ok(future) => { + future.await; // We don't need to check the result here + } + Err(panic_error) => { + log::error!("Panic in local sync task: {:?}", panic_error); } } + + // Signal that the task has ended, regardless of how it ended + let _ = outbox_tx.send(()); + log::info!("Local sync task ended"); }); (handle, outbox_rx, inbox_tx) -} +} \ No newline at end of file From 79c29655cf91de13f7a7e6ab36223783b4ef1c93 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sat, 3 Aug 2024 15:13:00 +0530 Subject: [PATCH 06/12] corrected time sync issue --- server/src/sync.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/sync.rs b/server/src/sync.rs index 090ac4a76..fc94f7a84 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,11 +16,11 @@ * */ +use std::panic::{self, AssertUnwindSafe}; use tokio::select; use tokio::sync::oneshot; use tokio::task; use tokio::time::{interval, Duration}; -use std::panic::{self, AssertUnwindSafe}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -59,6 +59,7 @@ pub async fn object_store_sync() -> ( } _ = &mut inbox_rx => break, } + tokio::time::sleep(Duration::from_secs(1)).await; } }); @@ -88,12 +89,13 @@ pub async fn run_local_sync() -> ( return; // Exit the async block when signaled } } + tokio::time::sleep(Duration::from_millis(50)).await; } })); match result { Ok(future) => { - future.await; // We don't need to check the result here + future.await; // We don't need to check the result here } Err(panic_error) => { log::error!("Panic in local sync task: {:?}", panic_error); @@ -106,4 +108,4 @@ pub async fn run_local_sync() -> ( }); (handle, outbox_rx, inbox_tx) -} \ No newline at end of file +} From 66dbbc85b8c1dea257ac9467c8cb87364aed24bd Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sat, 3 Aug 2024 22:25:27 +0530 Subject: [PATCH 07/12] pass server integrity test --- server/src/sync.rs | 100 +++++++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/server/src/sync.rs b/server/src/sync.rs index fc94f7a84..4bb39c9cc 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,11 +16,12 @@ * */ -use std::panic::{self, AssertUnwindSafe}; -use tokio::select; use tokio::sync::oneshot; use tokio::task; use tokio::time::{interval, Duration}; +use std::panic::AssertUnwindSafe; +use clokwerk::{AsyncScheduler, TimeUnits, Job}; +use futures::FutureExt; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -31,36 +32,49 @@ pub async fn object_store_sync() -> ( oneshot::Sender<()>, ) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, mut inbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let mut interval = interval(Duration::from_secs((STORAGE_UPLOAD_INTERVAL + 5).into())); - - loop { - select! { - _ = interval.tick() => { - match task::spawn(async { - CONFIG.storage().get_object_store().sync().await - }).await { - Ok(Ok(_)) => { - log::info!("Successfully synced local data with object store."); - } - Ok(Err(e)) => { - log::warn!("Failed to sync local data with object store: {:?}", e); - } - Err(e) => { - log::error!("Task panicked during sync: {:?}", e); - if let Err(send_err) = outbox_tx.send(()) { - log::error!("Failed to send outbox message: {:?}", send_err); - } - break; - } + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(STORAGE_UPLOAD_INTERVAL.seconds()) + .plus(5u32.seconds()) + .run(|| async { + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("failed to sync local data with object store. {:?}", e); + } + }); + + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let mut check_interval = interval(Duration::from_secs(1)); + + loop { + check_interval.tick().await; + scheduler.run_pending().await; + + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + log::warn!("Inbox channel closed unexpectedly"); + break; } } - _ = &mut inbox_rx => break, } - tokio::time::sleep(Duration::from_secs(1)).await; + })); + + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + log::error!("Panic in object store sync task: {:?}", panic_error); + let _ = outbox_tx.send(()); + } } + + log::info!("Object store sync task ended"); }); (handle, outbox_rx, inbox_tx) @@ -76,26 +90,34 @@ pub async fn run_local_sync() -> ( let handle = task::spawn(async move { log::info!("Local sync task started"); - let mut inbox_rx = inbox_rx; - let result = panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut sync_interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); + let mut check_interval = interval(Duration::from_millis(50)); + loop { - tokio::select! { - _ = interval.tick() => { - crate::event::STREAM_WRITERS.unset_all(); - } - _ = &mut inbox_rx => { - log::info!("Received signal to stop local sync"); - return; // Exit the async block when signaled + // Run STREAM_WRITERS.unset_all() at every interval tick + if sync_interval.tick().now_or_never().is_some() { + crate::event::STREAM_WRITERS.unset_all(); + } + + // Check inbox every 50ms + check_interval.tick().await; + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + log::warn!("Inbox channel closed unexpectedly"); + break; } } - tokio::time::sleep(Duration::from_millis(50)).await; } })); match result { Ok(future) => { - future.await; // We don't need to check the result here + future.await; } Err(panic_error) => { log::error!("Panic in local sync task: {:?}", panic_error); @@ -108,4 +130,4 @@ pub async fn run_local_sync() -> ( }); (handle, outbox_rx, inbox_tx) -} +} \ No newline at end of file From 93152d8913717a51e62abd6912e5d31fb2a0a511 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sun, 4 Aug 2024 10:33:59 +0530 Subject: [PATCH 08/12] switched back to `AsyncScheduler` --- server/src/sync.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/server/src/sync.rs b/server/src/sync.rs index 4bb39c9cc..446f53b7e 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,12 +16,11 @@ * */ +use clokwerk::{AsyncScheduler, Job, TimeUnits}; +use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; use tokio::task; -use tokio::time::{interval, Duration}; -use std::panic::AssertUnwindSafe; -use clokwerk::{AsyncScheduler, TimeUnits, Job}; -use futures::FutureExt; +use tokio::time::{interval, sleep, Duration}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -90,20 +89,24 @@ pub async fn run_local_sync() -> ( let handle = task::spawn(async move { log::info!("Local sync task started"); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - + let mut inbox_rx = inbox_rx; + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = interval(Duration::from_secs(storage::LOCAL_SYNC_INTERVAL)); - let mut check_interval = interval(Duration::from_millis(50)); + let mut scheduler = AsyncScheduler::new(); + scheduler + .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) + .run(|| async { + crate::event::STREAM_WRITERS.unset_all(); + }); loop { - // Run STREAM_WRITERS.unset_all() at every interval tick - if sync_interval.tick().now_or_never().is_some() { - crate::event::STREAM_WRITERS.unset_all(); - } + // Sleep for 50ms + sleep(Duration::from_millis(50)).await; - // Check inbox every 50ms - check_interval.tick().await; + // Run any pending scheduled tasks + scheduler.run_pending().await; + + // Check inbox match inbox_rx.try_recv() { Ok(_) => break, Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, @@ -124,10 +127,9 @@ pub async fn run_local_sync() -> ( } } - // Signal that the task has ended, regardless of how it ended let _ = outbox_tx.send(()); log::info!("Local sync task ended"); }); (handle, outbox_rx, inbox_tx) -} \ No newline at end of file +} From ac49651f5cba4d5b07b9e0b2f5c7cf4eb6705c96 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sun, 4 Aug 2024 11:46:37 +0530 Subject: [PATCH 09/12] corrected intendation --- server/src/handlers/http/modal/query_server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index d0e9e5988..73a44faf1 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -191,7 +191,8 @@ impl QueryServer { if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.download_from_s3()?; }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; From cd90c17c6a2829633c75e0c3e9739c4fe311a48c Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sun, 4 Aug 2024 13:05:24 +0530 Subject: [PATCH 10/12] changed hot tier manager execution order --- server/src/handlers/http/modal/query_server.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 73a44faf1..2666548fc 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -188,14 +188,16 @@ impl QueryServer { if matches!(init_cluster_metrics_schedular(), Ok(())) { log::info!("Cluster metrics scheduler started successfully"); } - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.download_from_s3()?; - }; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await; + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.download_from_s3()?; + }; + tokio::spawn(airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); From e9ade78e17407ceb9b2b95ffd794455163db634f Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 5 Aug 2024 09:12:34 +0530 Subject: [PATCH 11/12] Revert to `0f8d4ad` --- server/src/handlers/http/modal/query_server.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 2666548fc..d0e9e5988 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -188,15 +188,12 @@ impl QueryServer { if matches!(init_cluster_metrics_schedular(), Ok(())) { log::info!("Cluster metrics scheduler started successfully"); } - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.download_from_s3()?; }; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; tokio::spawn(airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); From 7d90b225ced7042a2892ce18e3adc8af7d6f2f61 Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Mon, 5 Aug 2024 14:16:27 +0530 Subject: [PATCH 12/12] fix linter --- server/src/handlers/http/modal/query_server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index d0e9e5988..73a44faf1 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -191,7 +191,8 @@ impl QueryServer { if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.download_from_s3()?; }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync().await;