diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 8f89136ef..d894848cc 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -27,7 +27,7 @@ use once_cell::sync::Lazy; use serde_json::Error as SerdeError; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; -use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::oneshot::{self, Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{trace, warn}; @@ -733,10 +733,16 @@ impl Alerts { let store = CONFIG.storage().get_object_store(); for alert in store.get_alerts().await.unwrap_or_default() { - let (handle, rx, tx) = - schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; - - self.update_task(alert.id, handle, rx, tx).await; + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let handle = schedule_alert_task( + alert.get_eval_frequency(), + alert.clone(), + inbox_rx, + outbox_tx, + )?; + + self.update_task(alert.id, handle, outbox_rx, inbox_tx).await; map.insert(alert.id, alert); } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 9bdeefc2c..a619ae989 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -25,6 +25,7 @@ use actix_web::{ HttpRequest, Responder, }; use bytes::Bytes; +use tokio::sync::oneshot; use ulid::Ulid; use crate::alerts::{ @@ -55,7 +56,14 @@ pub async fn post( user_auth_for_query(&session_key, &alert.query).await?; // create scheduled tasks - let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let handle = schedule_alert_task( + alert.get_eval_frequency(), + alert.clone(), + inbox_rx, + outbox_tx, + )?; // now that we've validated that the user can run this query // move on to saving the alert in ObjectStore @@ -67,7 +75,9 @@ pub async fn post( let alert_bytes = serde_json::to_vec(&alert)?; store.put_object(&path, Bytes::from(alert_bytes)).await?; - ALERTS.update_task(alert.id, handle, rx, tx).await; + ALERTS + .update_task(alert.id, handle, outbox_rx, inbox_tx) + .await; Ok(web::Json(alert)) } @@ -136,7 +146,14 @@ pub async fn modify( alert.validate().await?; // modify task - let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let handle = schedule_alert_task( + alert.get_eval_frequency(), + alert.clone(), + inbox_rx, + outbox_tx, + )?; // modify on disk CONFIG @@ -148,7 +165,9 @@ pub async fn modify( // modify in memory ALERTS.update(&alert).await; - ALERTS.update_task(alert.id, handle, rx, tx).await; + ALERTS + .update_task(alert.id, handle, outbox_rx, inbox_tx) + .await; Ok(web::Json(alert)) } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 8f0c92ca0..712c7b066 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -16,6 +16,8 @@ * */ +use std::thread; + use super::ingest::ingestor_logstream; use super::ingest::ingestor_rbac; use super::ingest::ingestor_role; @@ -54,7 +56,6 @@ use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; use tokio::sync::oneshot; -use tracing::error; use tracing::info; /// Metadata associated with this ingestor server @@ -199,15 +200,9 @@ impl ParseableServer for IngestServer { migration::run_migration(&CONFIG).await?; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - let ( - mut remote_conversion_handler, - mut remote_conversion_outbox, - mut remote_conversion_inbox, - ) = sync::arrow_conversion().await; + // Run sync on a background thread + let (cancel_tx, cancel_rx) = oneshot::channel(); + thread::spawn(|| sync::handler(cancel_rx)); tokio::spawn(airplane::server()); @@ -215,49 +210,11 @@ impl ParseableServer for IngestServer { set_ingestor_metadata().await?; // Ingestors shouldn't have to deal with OpenId auth flow - let app = self.start(shutdown_rx, prometheus.clone(), None); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - remote_conversion_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - }, - _ = &mut remote_conversion_outbox => { - // remote_conversion failed, this is recoverable by just starting remote_conversion thread again - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } - (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; - } - - } - } + let result = self.start(shutdown_rx, prometheus.clone(), None).await; + // Cancel sync jobs + cancel_tx.send(()).expect("Cancellation should not fail"); + + result } } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 83b4206dd..938e05dc5 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,6 +16,8 @@ * */ +use std::thread; + use crate::alerts::ALERTS; use crate::correlation::CORRELATIONS; use crate::handlers::airplane; @@ -27,10 +29,9 @@ use crate::handlers::http::{self, role}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; -use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use crate::{analytics, migration, storage}; +use crate::{analytics, migration, storage, sync}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; use actix_web_prometheus::PrometheusMetrics; @@ -133,44 +134,18 @@ impl ParseableServer for QueryServer { hot_tier_manager.put_internal_stream_hot_tier().await?; hot_tier_manager.download_from_s3()?; }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; + + // Run sync on a background thread + let (cancel_tx, cancel_rx) = oneshot::channel(); + thread::spawn(|| sync::handler(cancel_rx)); tokio::spawn(airplane::server()); - let app = self.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid()); - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - error!("Error joining localsync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } + let result = self.start(shutdown_rx, prometheus.clone(), None).await; + // Cancel sync jobs + cancel_tx.send(()).expect("Cancellation should not fail"); - }; - } + result } } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index ad6be59c0..4493a954a 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -16,6 +16,8 @@ * */ +use std::thread; + use crate::alerts::ALERTS; use crate::analytics; use crate::correlation::CORRELATIONS; @@ -130,15 +132,10 @@ impl ParseableServer for Server { hot_tier_manager.download_from_s3()?; }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - let ( - mut remote_conversion_handler, - mut remote_conversion_outbox, - mut remote_conversion_inbox, - ) = sync::arrow_conversion().await; + // Run sync on a background thread + let (cancel_tx, cancel_rx) = oneshot::channel(); + thread::spawn(|| sync::handler(cancel_rx)); + if CONFIG.options.send_analytics { analytics::init_analytics_scheduler()?; } @@ -146,50 +143,13 @@ impl ParseableServer for Server { tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); - let app = self.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid()); - - tokio::pin!(app); - - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - remote_conversion_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - }, - _ = &mut remote_conversion_outbox => { - // remote_conversion failed, this is recoverable by just starting remote_conversion thread again - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } - (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; - } - - }; - } + let result = self + .start(shutdown_rx, prometheus.clone(), CONFIG.options.openid()) + .await; + // Cancel sync jobs + cancel_tx.send(()).expect("Cancellation should not fail"); + + return result; } } diff --git a/src/lib.rs b/src/lib.rs index 0eb4b0a02..061595abe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,8 +56,8 @@ pub use handlers::http::modal::{ use once_cell::sync::Lazy; use reqwest::{Client, ClientBuilder}; -pub const STORAGE_CONVERSION_INTERVAL: u32 = 60; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 30; +pub const STORAGE_CONVERSION_INTERVAL: u64 = 60; +pub const STORAGE_UPLOAD_INTERVAL: u64 = 30; // A single HTTP client for all outgoing HTTP requests from the parseable server static HTTP_CLIENT: Lazy = Lazy::new(|| { diff --git a/src/sync.rs b/src/sync.rs index 1eceb21bc..6adb65dec 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -27,7 +27,8 @@ use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; use crate::staging::STAGING; -use crate::{storage, STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; +use crate::storage::LOCAL_SYNC_INTERVAL; +use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; // Calculates the instant that is the start of the next minute fn next_minute() -> Instant { @@ -72,7 +73,57 @@ where } } -pub async fn object_store_sync() -> ( +/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every +/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. +#[tokio::main(flavor = "current_thread")] +pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { + let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + object_store_sync(); + let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = + arrow_conversion(); + loop { + select! { + _ = &mut cancel_rx => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + remote_conversion_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } + return Ok(()); + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); + }, + _ = &mut remote_conversion_outbox => { + // remote_conversion failed, this is recoverable by just starting remote_conversion thread again + if let Err(e) = remote_conversion_handler.await { + error!("Error joining remote_conversion_handler: {:?}", e); + } + (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion(); + }, + } + } +} + +pub fn object_store_sync() -> ( task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>, @@ -82,10 +133,8 @@ pub async fn object_store_sync() -> ( let handle = task::spawn(async move { let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = interval_at( - next_minute(), - Duration::from_secs(STORAGE_UPLOAD_INTERVAL as u64), - ); + let mut sync_interval = + interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL)); let mut inbox_rx = AssertUnwindSafe(inbox_rx); @@ -136,7 +185,7 @@ pub async fn object_store_sync() -> ( (handle, outbox_rx, inbox_tx) } -pub async fn arrow_conversion() -> ( +pub fn arrow_conversion() -> ( task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>, @@ -148,7 +197,7 @@ pub async fn arrow_conversion() -> ( let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { let mut sync_interval = interval_at( next_minute() + Duration::from_secs(5), // 5 second delay - Duration::from_secs(STORAGE_CONVERSION_INTERVAL as u64), + Duration::from_secs(STORAGE_CONVERSION_INTERVAL), ); let mut inbox_rx = AssertUnwindSafe(inbox_rx); @@ -193,7 +242,7 @@ pub async fn arrow_conversion() -> ( (handle, outbox_rx, inbox_tx) } -pub async fn run_local_sync() -> ( +pub fn run_local_sync() -> ( task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>, @@ -206,10 +255,8 @@ pub async fn run_local_sync() -> ( let mut inbox_rx = inbox_rx; let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = interval_at( - next_minute(), - Duration::from_secs(storage::LOCAL_SYNC_INTERVAL), - ); + let mut sync_interval = + interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL)); loop { select! { @@ -244,20 +291,12 @@ pub async fn run_local_sync() -> ( (handle, outbox_rx, inbox_tx) } -pub async fn schedule_alert_task( +pub fn schedule_alert_task( eval_frequency: u64, alert: AlertConfig, -) -> Result< - ( - task::JoinHandle<()>, - oneshot::Receiver<()>, - oneshot::Sender<()>, - ), - AlertError, -> { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - + inbox_rx: oneshot::Receiver<()>, + outbox_tx: oneshot::Sender<()>, +) -> Result, AlertError> { let handle = tokio::task::spawn(async move { info!("new alert task started for {alert:?}"); @@ -296,5 +335,5 @@ pub async fn schedule_alert_task( } } }); - Ok((handle, outbox_rx, inbox_tx)) + Ok(handle) }