Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object_store = { version = "0.5.1", features = ["aws"] }
derive_more = "0.99.17"
env_logger = "0.9.0"
futures = "0.3"
fs_extra = "1.2.0"
http = "0.2.4"
humantime-serde = "1.1.1"
lazy_static = "1.4.0"
Expand All @@ -33,6 +34,7 @@ num_cpus = "1.0.0"
os_info = "3.0.7"
hostname = "0.3"
rand = "0.8.4"
relative-path = "1.7.2"
rustls = "0.20.6"
rustls-pemfile = "1.0.1"
rust-flatten-json = "0.2.0"
Expand All @@ -43,10 +45,11 @@ serde_json = "^1.0.8"
sysinfo = "0.26.4"
thiserror = "1"
thread-priority = "0.9.2"
tokio-stream = "0.1.8"
tokio-stream = { version = "0.1.8", features = ["fs"] }
tokio = { version = "1.13.1", default-features = false, features = [
"sync",
"macros",
"fs",
] }
clokwerk = "0.4.0-rc1"
actix-web-static-files = "4.0"
Expand Down
14 changes: 7 additions & 7 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use std::sync::RwLock;

use crate::metadata;
use crate::metadata::LOCK_EXPECT;
use crate::s3;
use crate::storage::{ObjectStorage, StorageDir};
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};

use self::error::{EventError, StreamWriterError};

Expand Down Expand Up @@ -184,7 +184,7 @@ impl Event {
} else {
// if stream schema is none then it is first event,
// process first event and store schema in obect store
self.process_first_event::<s3::S3, _>(event, inferred_schema)?
self.process_first_event(event, inferred_schema)?
};

metadata::STREAM_INFO.update_stats(
Expand All @@ -202,7 +202,7 @@ impl Event {
// This is called when the first event of a log stream is received. The first event is
// special because we parse this event to generate the schema for the log stream. This
// schema is then enforced on rest of the events sent to this log stream.
fn process_first_event<S: ObjectStorage, R: std::io::Read>(
fn process_first_event<R: std::io::Read>(
&self,
event: json::Reader<R>,
schema: Schema,
Expand Down Expand Up @@ -241,13 +241,13 @@ impl Event {
"setting schema on objectstore for logstream {}",
stream_name
);
let storage = S::new();
let storage = CONFIG.storage().get_object_store();

let stream_name = stream_name.clone();
spawn(async move {
if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await {
if let Err(e) = storage.put_schema(&stream_name, &schema).await {
// If this call has failed then currently there is no right way to make local state consistent
// this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync.
// this needs a fix after more constraints are safety guarentee is provided by localwriter and objectstore_sync.
// Reasoning -
// - After dropping lock many events may process through
// - Processed events may sync before metadata deletion
Expand Down
7 changes: 4 additions & 3 deletions server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use actix_web::{web, HttpRequest, HttpResponse};
use serde_json::Value;

use crate::event;
use crate::option::CONFIG;
use crate::query::Query;
use crate::response::QueryResponse;
use crate::s3::S3;
use crate::storage::ObjectStorageProvider;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use crate::utils::{self, flatten_json_body, merge};

Expand All @@ -39,9 +40,9 @@ pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResp
let json = json.into_inner();
let query = Query::parse(json)?;

let storage = S3::new();
let storage = CONFIG.storage().get_object_store();

let query_result = query.execute(&storage).await;
let query_result = query.execute(&*storage).await;

query_result
.map(Into::<QueryResponse>::into)
Expand Down
44 changes: 33 additions & 11 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use chrono::Utc;
use serde_json::Value;

use crate::alerts::Alerts;
use crate::s3::S3;
use crate::storage::{ObjectStorage, StorageDir};
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::{event, response};
use crate::{metadata, validator};

Expand All @@ -40,17 +40,17 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
.to_http();
}

let s3 = S3::new();
let objectstore = CONFIG.storage().get_object_store();

if s3.get_schema(&stream_name).await.is_err() {
if objectstore.get_schema(&stream_name).await.is_err() {
return response::ServerResponse {
msg: format!("log stream {} does not exist", stream_name),
code: StatusCode::BAD_REQUEST,
}
.to_http();
}

if let Err(e) = s3.delete_stream(&stream_name).await {
if let Err(e) = objectstore.delete_stream(&stream_name).await {
return response::ServerResponse {
msg: format!(
"failed to delete log stream {} due to err: {}",
Expand Down Expand Up @@ -87,7 +87,14 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
}

pub async fn list(_: HttpRequest) -> impl Responder {
response::list_response(S3::new().list_streams().await.unwrap())
response::list_response(
CONFIG
.storage()
.get_object_store()
.list_streams()
.await
.unwrap(),
)
}

pub async fn schema(req: HttpRequest) -> HttpResponse {
Expand All @@ -101,7 +108,12 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
code: StatusCode::OK,
}
.to_http(),
Err(_) => match S3::new().get_schema(&stream_name).await {
Err(_) => match CONFIG
.storage()
.get_object_store()
.get_schema(&stream_name)
.await
{
Ok(None) => response::ServerResponse {
msg: "log stream is not initialized, please post an event before fetching schema"
.to_string(),
Expand Down Expand Up @@ -136,7 +148,12 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse {

let mut alerts = match alerts {
Some(alerts) => alerts,
None => match S3::new().get_alerts(&stream_name).await {
None => match CONFIG
.storage()
.get_object_store()
.get_alerts(&stream_name)
.await
{
Ok(alerts) if alerts.alerts.is_empty() => {
return response::ServerResponse {
msg: "alert configuration not set for log stream {}".to_string(),
Expand Down Expand Up @@ -233,7 +250,12 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
}
}

if let Err(e) = S3::new().put_alerts(&stream_name, &alerts).await {
if let Err(e) = CONFIG
.storage()
.get_object_store()
.put_alerts(&stream_name, &alerts)
.await
{
return response::ServerResponse {
msg: format!(
"failed to set alert configuration for log stream {} due to err: {}",
Expand Down Expand Up @@ -333,8 +355,8 @@ pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse {
}

// Proceed to create log stream if it doesn't exist
let s3 = S3::new();
if let Err(e) = s3.create_stream(&stream_name).await {
let storage = CONFIG.storage().get_object_store();
if let Err(e) = storage.create_stream(&stream_name).await {
// Fail if unable to create log stream on object store backend
response::ServerResponse {
msg: format!(
Expand Down
5 changes: 2 additions & 3 deletions server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use sysinfo::{System, SystemExt};

use crate::s3::S3;
use crate::storage::ObjectStorage;
use crate::{option::CONFIG, storage::ObjectStorageProvider};

pub async fn liveness() -> HttpResponse {
// If the available memory is less than 100MiB, return a 503 error.
Expand All @@ -37,7 +36,7 @@ pub async fn liveness() -> HttpResponse {
}

pub async fn readiness() -> HttpResponse {
if let Ok(()) = S3::new().check().await {
if CONFIG.storage().get_object_store().check().await.is_ok() {
return HttpResponse::new(StatusCode::OK);
}

Expand Down
30 changes: 15 additions & 15 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ mod metadata;
mod option;
mod query;
mod response;
mod s3;
mod stats;
mod storage;
mod utils;
mod validator;

use option::CONFIG;
use s3::S3;
use storage::ObjectStorage;

use crate::storage::ObjectStorageProvider;

// Global configurations
const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000;
Expand All @@ -67,44 +66,45 @@ async fn main() -> anyhow::Result<()> {
env_logger::init();
CONFIG.print();
CONFIG.validate();
let storage = S3::new();
CONFIG.validate_storage(&storage).await;
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
let storage = CONFIG.storage().get_object_store();
CONFIG.validate_storage(&*storage).await;
if let Err(e) = metadata::STREAM_INFO.load(&*storage).await {
warn!("could not populate local metadata. {:?}", e);
}
// track all parquet files already in the data directory
storage::CACHED_FILES.track_parquet();

let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
object_store_sync();

let app = run_http();
tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
s3sync_inbox.send(()).unwrap_or(());
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
localsync_handler.join().unwrap_or(());
s3sync_handler.join().unwrap_or(());
remote_sync_handler.join().unwrap_or(());
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to disc. This can happen due to critical error in disc or environment. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut s3sync_outbox => {
// s3sync failed, this is recoverable by just starting s3sync thread again
s3sync_handler.join().unwrap_or(());
(s3sync_handler, s3sync_outbox, s3sync_inbox) = s3_sync();
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
remote_sync_handler.join().unwrap_or(());
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync();
}
};
}
}

fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
Expand All @@ -116,7 +116,7 @@ fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
scheduler
.every((CONFIG.parseable.upload_interval as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().s3_sync().await {
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
warn!("failed to sync local data with object store. {:?}", e);
}
});
Expand Down
4 changes: 2 additions & 2 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ impl STREAM_INFO {
map.remove(stream_name);
}

pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> {
// When loading streams this function will assume list_streams only returns valid streams.
pub async fn load(&self, storage: &(impl ObjectStorage + ?Sized)) -> Result<(), LoadError> {
// When loading streams this funtion will assume list_streams only returns valid streams.
// a valid stream would have a .schema file.
// .schema file could be empty in that case it will be treated as an uninitialized stream.
// return error in case of an error from object storage itself.
Expand Down
Loading