Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use once_cell::sync::Lazy;
use serde_json::Error as SerdeError;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{trace, warn};
Expand Down Expand Up @@ -733,10 +733,16 @@ impl Alerts {
let store = CONFIG.storage().get_object_store();

for alert in store.get_alerts().await.unwrap_or_default() {
let (handle, rx, tx) =
schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;

self.update_task(alert.id, handle, rx, tx).await;
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let handle = schedule_alert_task(
alert.get_eval_frequency(),
alert.clone(),
inbox_rx,
outbox_tx,
)?;

self.update_task(alert.id, handle, outbox_rx, inbox_tx).await;

map.insert(alert.id, alert);
}
Expand Down
27 changes: 23 additions & 4 deletions src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use actix_web::{
HttpRequest, Responder,
};
use bytes::Bytes;
use tokio::sync::oneshot;
use ulid::Ulid;

use crate::alerts::{
Expand Down Expand Up @@ -55,7 +56,14 @@ pub async fn post(
user_auth_for_query(&session_key, &alert.query).await?;

// create scheduled tasks
let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let handle = schedule_alert_task(
alert.get_eval_frequency(),
alert.clone(),
inbox_rx,
outbox_tx,
)?;

// now that we've validated that the user can run this query
// move on to saving the alert in ObjectStore
Expand All @@ -67,7 +75,9 @@ pub async fn post(
let alert_bytes = serde_json::to_vec(&alert)?;
store.put_object(&path, Bytes::from(alert_bytes)).await?;

ALERTS.update_task(alert.id, handle, rx, tx).await;
ALERTS
.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;

Ok(web::Json(alert))
}
Expand Down Expand Up @@ -136,7 +146,14 @@ pub async fn modify(
alert.validate().await?;

// modify task
let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let handle = schedule_alert_task(
alert.get_eval_frequency(),
alert.clone(),
inbox_rx,
outbox_tx,
)?;

// modify on disk
CONFIG
Expand All @@ -148,7 +165,9 @@ pub async fn modify(
// modify in memory
ALERTS.update(&alert).await;

ALERTS.update_task(alert.id, handle, rx, tx).await;
ALERTS
.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;

Ok(web::Json(alert))
}
Expand Down
63 changes: 10 additions & 53 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::thread;

use super::ingest::ingestor_logstream;
use super::ingest::ingestor_rbac;
use super::ingest::ingestor_role;
Expand Down Expand Up @@ -54,7 +56,6 @@ use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::oneshot;
use tracing::error;
use tracing::info;

/// Metadata associated with this ingestor server
Expand Down Expand Up @@ -199,65 +200,21 @@ impl ParseableServer for IngestServer {

migration::run_migration(&CONFIG).await?;

let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;
let (
mut remote_conversion_handler,
mut remote_conversion_outbox,
mut remote_conversion_inbox,
) = sync::arrow_conversion().await;
// Run sync on a background thread
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

tokio::spawn(airplane::server());

// set the ingestor metadata
set_ingestor_metadata().await?;

// Ingestors shouldn't have to deal with OpenId auth flow
let app = self.start(shutdown_rx, prometheus.clone(), None);

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
}

}
}
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

result
}
}

Expand Down
47 changes: 11 additions & 36 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::thread;

use crate::alerts::ALERTS;
use crate::correlation::CORRELATIONS;
use crate::handlers::airplane;
Expand All @@ -27,10 +29,9 @@ use crate::handlers::http::{self, role};
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use crate::{analytics, migration, storage};
use crate::{analytics, migration, storage, sync};
use actix_web::web::{resource, ServiceConfig};
use actix_web::{web, Scope};
use actix_web_prometheus::PrometheusMetrics;
Expand Down Expand Up @@ -133,44 +134,18 @@ impl ParseableServer for QueryServer {
hot_tier_manager.put_internal_stream_hot_tier().await?;
hot_tier_manager.download_from_s3()?;
};
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;

// Run sync on a background thread
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

tokio::spawn(airplane::server());
let app = self.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid());

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining localsync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
}
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

};
}
result
}
}

Expand Down
66 changes: 13 additions & 53 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::thread;

use crate::alerts::ALERTS;
use crate::analytics;
use crate::correlation::CORRELATIONS;
Expand Down Expand Up @@ -130,66 +132,24 @@ impl ParseableServer for Server {
hot_tier_manager.download_from_s3()?;
};

let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;
let (
mut remote_conversion_handler,
mut remote_conversion_outbox,
mut remote_conversion_inbox,
) = sync::arrow_conversion().await;
// Run sync on a background thread
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

if CONFIG.options.send_analytics {
analytics::init_analytics_scheduler()?;
}

tokio::spawn(handlers::livetail::server());
tokio::spawn(handlers::airplane::server());

let app = self.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid());

tokio::pin!(app);

loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
}

};
}
let result = self
.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid())
.await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

return result;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub use handlers::http::modal::{
use once_cell::sync::Lazy;
use reqwest::{Client, ClientBuilder};

pub const STORAGE_CONVERSION_INTERVAL: u32 = 60;
pub const STORAGE_UPLOAD_INTERVAL: u32 = 30;
pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;

// A single HTTP client for all outgoing HTTP requests from the parseable server
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Expand Down
Loading
Loading