diff --git a/server/src/handlers.rs b/server/src/handlers.rs new file mode 100644 index 000000000..5dfb5e35e --- /dev/null +++ b/server/src/handlers.rs @@ -0,0 +1,25 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod http; + +const PREFIX_TAGS: &str = "x-p-tag-"; +const PREFIX_META: &str = "x-p-meta-"; +const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; +const FILL_NULL_OPTION_KEY: &str = "send_null"; +const SEPARATOR: char = '^'; diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs deleted file mode 100644 index f0acebb71..000000000 --- a/server/src/handlers/event.rs +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use actix_web::{web, HttpRequest, HttpResponse, Responder}; -use serde_json::Value; -use std::time::Instant; - -use crate::event; -use crate::metrics::QUERY_EXECUTE_TIME; -use crate::option::CONFIG; -use crate::query::Query; -use crate::response::QueryResponse; -use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; -use crate::utils::json::{flatten_json_body, merge}; - -use self::error::{PostError, QueryError}; - -const PREFIX_TAGS: &str = "x-p-tag-"; -const PREFIX_META: &str = "x-p-meta-"; -const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; -const FILL_NULL_OPTION_KEY: &str = "send_null"; -const SEPARATOR: char = '^'; - -pub async fn query( - _req: HttpRequest, - json: web::Json, -) -> Result { - let time = Instant::now(); - let json = json.into_inner(); - - let fill_null = json - .as_object() - .and_then(|map| map.get(FILL_NULL_OPTION_KEY)) - .and_then(|value| value.as_bool()) - .unwrap_or_default(); - - let query = Query::parse(json)?; - - let storage = CONFIG.storage().get_object_store(); - let query_result = query.execute(storage).await; - let query_result = query_result - .map(|(records, fields)| QueryResponse::new(records, fields, fill_null)) - .map(|response| response.to_http()) - .map_err(|e| e.into()); - - let time = time.elapsed().as_secs_f64(); - QUERY_EXECUTE_TIME - .with_label_values(&[query.stream_name.as_str()]) - .observe(time); - - query_result -} - -// Handler for POST /api/v1/ingest -// ingests events into the specified logstream in the header -// if the logstream does not exist, it is created -pub async fn ingest( - req: HttpRequest, - body: web::Json, -) -> Result { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { - let stream_name = stream_name.to_str().unwrap().to_owned(); - if let Err(e) = super::logstream::create_stream_if_not_exists(&stream_name).await { - return Err(PostError::CreateStream(e.into())); - } - push_logs(stream_name, req, body).await?; - Ok(HttpResponse::Ok().finish()) - } else { - Err(PostError::Header(ParseHeaderError::MissingStreamName)) - } -} - -// Handler for POST /api/v1/logstream/{logstream} -// only ingests events into the specified logstream -// fails if the logstream does not exist -pub async fn post_event( - req: HttpRequest, - body: web::Json, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - push_logs(stream_name, req, body).await?; - Ok(HttpResponse::Ok().finish()) -} - -async fn push_logs( - stream_name: String, - req: HttpRequest, - body: web::Json, -) -> Result<(), PostError> { - let tags_n_metadata = [ - ( - "p_tags".to_string(), - Value::String(collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?), - ), - ( - "p_metadata".to_string(), - Value::String(collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?), - ), - ]; - - match body.0 { - Value::Array(array) => { - for mut body in array { - merge(&mut body, tags_n_metadata.clone().into_iter()); - let body = flatten_json_body(&body).unwrap(); - let schema_key = event::get_schema_key(&body); - - let event = event::Event { - body, - stream_name: stream_name.clone(), - schema_key, - }; - - event.process().await?; - } - } - mut body @ Value::Object(_) => { - merge(&mut body, tags_n_metadata.into_iter()); - let body = flatten_json_body(&body).unwrap(); - let schema_key = event::get_schema_key(&body); - let event = event::Event { - body, - stream_name, - schema_key, - }; - - event.process().await?; - } - _ => return Err(PostError::Invalid), - } - - Ok(()) -} - -pub mod error { - use actix_web::http::header::ContentType; - use http::StatusCode; - - use crate::{ - event::error::EventError, - query::error::{ExecuteError, ParseError}, - utils::header_parsing::ParseHeaderError, - }; - - #[derive(Debug, thiserror::Error)] - pub enum QueryError { - #[error("Bad request: {0}")] - Parse(#[from] ParseError), - #[error("Query execution failed due to {0}")] - Execute(#[from] ExecuteError), - } - - impl actix_web::ResponseError for QueryError { - fn status_code(&self) -> http::StatusCode { - match self { - QueryError::Parse(_) => StatusCode::BAD_REQUEST, - QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, - } - } - - fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::plaintext()) - .body(self.to_string()) - } - } - - #[derive(Debug, thiserror::Error)] - pub enum PostError { - #[error("Header Error: {0}")] - Header(#[from] ParseHeaderError), - #[error("Event Error: {0}")] - Event(#[from] EventError), - #[error("Invalid Request")] - Invalid, - #[error("Failed to create stream due to {0}")] - CreateStream(Box), - } - - impl actix_web::ResponseError for PostError { - fn status_code(&self) -> http::StatusCode { - match self { - PostError::Header(_) => StatusCode::BAD_REQUEST, - PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::Invalid => StatusCode::BAD_REQUEST, - PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, - } - } - - fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::plaintext()) - .body(self.to_string()) - } - } -} diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs new file mode 100644 index 000000000..765aad5cc --- /dev/null +++ b/server/src/handlers/http.rs @@ -0,0 +1,196 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::fs::File; +use std::io::BufReader; + +use actix_cors::Cors; +use actix_web::dev::ServiceRequest; +use actix_web::{middleware, web, App, HttpServer}; +use actix_web_httpauth::extractors::basic::BasicAuth; +use actix_web_httpauth::middleware::HttpAuthentication; +use actix_web_prometheus::PrometheusMetrics; +use actix_web_static_files::ResourceFiles; +use rustls::{Certificate, PrivateKey, ServerConfig}; +use rustls_pemfile::{certs, pkcs8_private_keys}; + +use crate::option::CONFIG; + +mod health_check; +mod ingest; +mod logstream; +mod query; + +include!(concat!(env!("OUT_DIR"), "/generated.rs")); + +const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; +const API_BASE_PATH: &str = "/api"; +const API_VERSION: &str = "v1"; + +#[macro_export] +macro_rules! create_app { + ($prometheus: expr) => { + App::new() + .wrap($prometheus.clone()) + .configure(|cfg| configure_routes(cfg)) + .wrap(middleware::Logger::default()) + .wrap(middleware::Compress::default()) + .wrap( + Cors::default() + .allow_any_header() + .allow_any_method() + .allow_any_origin(), + ) + }; +} + +async fn validator( + req: ServiceRequest, + credentials: BasicAuth, +) -> Result { + if credentials.user_id().trim() == CONFIG.parseable.username + && credentials.password().unwrap().trim() == CONFIG.parseable.password + { + return Ok(req); + } + + Err((actix_web::error::ErrorUnauthorized("Unauthorized"), req)) +} + +pub async fn run_http(prometheus: PrometheusMetrics) -> anyhow::Result<()> { + let ssl_acceptor = match ( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + ) { + (Some(cert), Some(key)) => { + // init server config builder with safe defaults + let config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + + // load TLS key/cert files + let cert_file = &mut BufReader::new(File::open(cert)?); + let key_file = &mut BufReader::new(File::open(key)?); + + // convert files to key/cert objects + let cert_chain = certs(cert_file)?.into_iter().map(Certificate).collect(); + + let mut keys: Vec = pkcs8_private_keys(key_file)? + .into_iter() + .map(PrivateKey) + .collect(); + + // exit if no keys could be parsed + if keys.is_empty() { + anyhow::bail!("Could not locate PKCS 8 private keys."); + } + + let server_config = config.with_single_cert(cert_chain, keys.remove(0))?; + + Some(server_config) + } + (_, _) => None, + }; + + // concurrent workers equal to number of cores on the cpu + let http_server = HttpServer::new(move || create_app!(prometheus)).workers(num_cpus::get()); + if let Some(config) = ssl_acceptor { + http_server + .bind_rustls(&CONFIG.parseable.address, config)? + .run() + .await?; + } else { + http_server.bind(&CONFIG.parseable.address)?.run().await?; + } + + Ok(()) +} + +pub fn configure_routes(cfg: &mut web::ServiceConfig) { + let generated = generate(); + + let logstream_api = web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route(web::put().to(logstream::put_stream)) + // POST "/logstream/{logstream}" ==> Post logs to given log stream + .route(web::post().to(ingest::post_event)) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route(web::delete().to(logstream::delete)) + .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + web::resource("/alert") + // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream + .route(web::put().to(logstream::put_alert)) + // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream + .route(web::get().to(logstream::get_alert)), + ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route(web::get().to(logstream::schema)), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route(web::get().to(logstream::get_stats)), + ) + .service( + // GET "/logstream/{logstream}/retention" ==> Set retention for given logstream + web::resource("/retention").route(web::put().to(logstream::put_retention)), + ); + + cfg.service( + // Base path "{url}/api/v1" + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(web::resource("/query").route(web::post().to(query::query))) + // POST "/ingest" ==> Post logs to given log stream based on header + .service( + web::resource("/ingest") + .route(web::post().to(ingest::ingest)) + .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command + .service(web::resource("/liveness").route(web::get().to(health_check::liveness))) + // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes + .service(web::resource("/readiness").route(web::get().to(health_check::readiness))) + .service( + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("").route(web::get().to(logstream::list)), + ) + .service( + // logstream API + logstream_api, + ), + ) + .wrap(HttpAuthentication::basic(validator)), + ) + // GET "/" ==> Serve the static frontend directory + .service(ResourceFiles::new("/", generated)); +} + +fn base_path() -> String { + format!("{API_BASE_PATH}/{API_VERSION}") +} + +pub fn metrics_path() -> String { + format!("{}/metrics", base_path()) +} diff --git a/server/src/handlers/mod.rs b/server/src/handlers/http/health_check.rs similarity index 96% rename from server/src/handlers/mod.rs rename to server/src/handlers/http/health_check.rs index a513fc05c..88197d352 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/http/health_check.rs @@ -16,9 +16,6 @@ * */ -pub mod event; -pub mod logstream; - use actix_web::http::StatusCode; use actix_web::HttpResponse; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs new file mode 100644 index 000000000..1a5ce607f --- /dev/null +++ b/server/src/handlers/http/ingest.rs @@ -0,0 +1,139 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::http::header::ContentType; +use actix_web::{web, HttpRequest, HttpResponse}; +use http::StatusCode; +use serde_json::Value; + +use crate::event; +use crate::event::error::EventError; +use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY}; +use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use crate::utils::json::{flatten_json_body, merge}; + +pub async fn ingest( + req: HttpRequest, + body: web::Json, +) -> Result { + if let Some((_, stream_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + { + let stream_name = stream_name.to_str().unwrap().to_owned(); + if let Err(e) = super::logstream::create_stream_if_not_exists(&stream_name).await { + return Err(PostError::CreateStream(e.into())); + } + push_logs(stream_name, req, body).await?; + Ok(HttpResponse::Ok().finish()) + } else { + Err(PostError::Header(ParseHeaderError::MissingStreamName)) + } +} + +// Handler for POST /api/v1/logstream/{logstream} +// only ingests events into the specified logstream +// fails if the logstream does not exist +pub async fn post_event( + req: HttpRequest, + body: web::Json, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + push_logs(stream_name, req, body).await?; + Ok(HttpResponse::Ok().finish()) +} + +async fn push_logs( + stream_name: String, + req: HttpRequest, + body: web::Json, +) -> Result<(), PostError> { + let tags_n_metadata = [ + ( + "p_tags".to_string(), + Value::String(collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?), + ), + ( + "p_metadata".to_string(), + Value::String(collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?), + ), + ]; + + match body.0 { + Value::Array(array) => { + for mut body in array { + merge(&mut body, tags_n_metadata.clone().into_iter()); + let body = flatten_json_body(&body).unwrap(); + let schema_key = event::get_schema_key(&body); + + let event = event::Event { + body, + stream_name: stream_name.clone(), + schema_key, + }; + + event.process().await?; + } + } + mut body @ Value::Object(_) => { + merge(&mut body, tags_n_metadata.into_iter()); + let body = flatten_json_body(&body).unwrap(); + let schema_key = event::get_schema_key(&body); + let event = event::Event { + body, + stream_name, + schema_key, + }; + + event.process().await?; + } + _ => return Err(PostError::Invalid), + } + + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum PostError { + #[error("Header Error: {0}")] + Header(#[from] ParseHeaderError), + #[error("Event Error: {0}")] + Event(#[from] EventError), + #[error("Invalid Request")] + Invalid, + #[error("Failed to create stream due to {0}")] + CreateStream(Box), +} + +impl actix_web::ResponseError for PostError { + fn status_code(&self) -> http::StatusCode { + match self { + PostError::Header(_) => StatusCode::BAD_REQUEST, + PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::Invalid => StatusCode::BAD_REQUEST, + PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/http/logstream.rs similarity index 100% rename from server/src/handlers/logstream.rs rename to server/src/handlers/http/logstream.rs diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs new file mode 100644 index 000000000..056c29052 --- /dev/null +++ b/server/src/handlers/http/query.rs @@ -0,0 +1,83 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::http::header::ContentType; +use actix_web::{web, HttpRequest, Responder}; +use http::StatusCode; +use serde_json::Value; +use std::time::Instant; + +use crate::handlers::FILL_NULL_OPTION_KEY; +use crate::metrics::QUERY_EXECUTE_TIME; +use crate::option::CONFIG; +use crate::query::error::{ExecuteError, ParseError}; +use crate::query::Query; +use crate::response::QueryResponse; + +pub async fn query( + _req: HttpRequest, + json: web::Json, +) -> Result { + let time = Instant::now(); + let json = json.into_inner(); + + let fill_null = json + .as_object() + .and_then(|map| map.get(FILL_NULL_OPTION_KEY)) + .and_then(|value| value.as_bool()) + .unwrap_or_default(); + + let query = Query::parse(json)?; + + let storage = CONFIG.storage().get_object_store(); + let query_result = query.execute(storage).await; + let query_result = query_result + .map(|(records, fields)| QueryResponse::new(records, fields, fill_null)) + .map(|response| response.to_http()) + .map_err(|e| e.into()); + + let time = time.elapsed().as_secs_f64(); + QUERY_EXECUTE_TIME + .with_label_values(&[query.stream_name.as_str()]) + .observe(time); + + query_result +} + +#[derive(Debug, thiserror::Error)] +pub enum QueryError { + #[error("Bad request: {0}")] + Parse(#[from] ParseError), + #[error("Query execution failed due to {0}")] + Execute(#[from] ExecuteError), +} + +impl actix_web::ResponseError for QueryError { + fn status_code(&self) -> http::StatusCode { + match self { + QueryError::Parse(_) => StatusCode::BAD_REQUEST, + QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 0fbf490e6..f98c488d2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,17 +16,7 @@ * */ -use actix_cors::Cors; -use actix_web::dev::ServiceRequest; -use actix_web::{middleware, web, App, HttpServer}; -use actix_web_httpauth::extractors::basic::BasicAuth; -use actix_web_httpauth::middleware::HttpAuthentication; -use actix_web_prometheus::PrometheusMetrics; -use actix_web_static_files::ResourceFiles; use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; -use log::warn; -use rustls::{Certificate, PrivateKey, ServerConfig}; -use rustls_pemfile::{certs, pkcs8_private_keys}; use thread_priority::{ThreadBuilder, ThreadPriority}; use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; @@ -36,11 +26,6 @@ use pyroscope::PyroscopeAgent; #[cfg(feature = "debug")] use pyroscope_pprofrs::{pprof_backend, PprofConfig}; -include!(concat!(env!("OUT_DIR"), "/generated.rs")); - -use std::env; -use std::fs::File; -use std::io::BufReader; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::thread::{self, JoinHandle}; use std::time::Duration; @@ -62,10 +47,6 @@ mod validator; use option::CONFIG; -const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; -const API_BASE_PATH: &str = "/api"; -const API_VERSION: &str = "v1"; - #[cfg(feature = "debug")] const DEBUG_PYROSCOPE_URL: &str = "P_PROFILE_PYROSCOPE_URL"; #[cfg(feature = "debug")] @@ -95,7 +76,7 @@ async fn main() -> anyhow::Result<()> { } if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { - warn!("could not populate local metadata. {:?}", e); + log::warn!("could not populate local metadata. {:?}", e); } // track all parquet files already in the data directory @@ -108,7 +89,7 @@ async fn main() -> anyhow::Result<()> { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = object_store_sync(); - let app = run_http(prometheus); + let app = handlers::http::run_http(prometheus); tokio::pin!(app); loop { tokio::select! { @@ -162,7 +143,7 @@ fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sende .every((CONFIG.parseable.upload_interval as u32).seconds()) .run(|| async { if let Err(e) = CONFIG.storage().get_object_store().sync().await { - warn!("failed to sync local data with object store. {:?}", e); + log::warn!("failed to sync local data with object store. {:?}", e); } }); @@ -207,7 +188,7 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<( .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) .run(move || { if let Err(e) = crate::event::STREAM_WRITERS::unset_all() { - warn!("failed to sync local data. {:?}", e); + log::warn!("failed to sync local data. {:?}", e); } }); @@ -233,191 +214,3 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<( (handle, outbox_rx, inbox_tx) } - -async fn validator( - req: ServiceRequest, - credentials: BasicAuth, -) -> Result { - if credentials.user_id().trim() == CONFIG.parseable.username - && credentials.password().unwrap().trim() == CONFIG.parseable.password - { - return Ok(req); - } - - Err((actix_web::error::ErrorUnauthorized("Unauthorized"), req)) -} - -async fn run_http(prometheus: PrometheusMetrics) -> anyhow::Result<()> { - let ssl_acceptor = match ( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - ) { - (Some(cert), Some(key)) => { - // init server config builder with safe defaults - let config = ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth(); - - // load TLS key/cert files - let cert_file = &mut BufReader::new(File::open(cert)?); - let key_file = &mut BufReader::new(File::open(key)?); - - // convert files to key/cert objects - let cert_chain = certs(cert_file)?.into_iter().map(Certificate).collect(); - - let mut keys: Vec = pkcs8_private_keys(key_file)? - .into_iter() - .map(PrivateKey) - .collect(); - - // exit if no keys could be parsed - if keys.is_empty() { - anyhow::bail!("Could not locate PKCS 8 private keys."); - } - - let server_config = config.with_single_cert(cert_chain, keys.remove(0))?; - - Some(server_config) - } - (_, _) => None, - }; - - // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(move || create_app!(prometheus)).workers(num_cpus::get()); - if let Some(config) = ssl_acceptor { - http_server - .bind_rustls(&CONFIG.parseable.address, config)? - .run() - .await?; - } else { - http_server.bind(&CONFIG.parseable.address)?.run().await?; - } - - Ok(()) -} - -pub fn configure_routes(cfg: &mut web::ServiceConfig) { - let generated = generate(); - - cfg.service( - // Base path "{url}/api/v1" - web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body - .service(web::resource(query_path()).route(web::post().to(handlers::event::query))) - // POST "/ingest" ==> Post logs to given log stream based on header - .service( - web::resource(ingest_path()) - .route(web::post().to(handlers::event::ingest)) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - .service( - // logstream API - web::resource(logstream_path("{logstream}")) - // PUT "/logstream/{logstream}" ==> Create log stream - .route(web::put().to(handlers::logstream::put_stream)) - // POST "/logstream/{logstream}" ==> Post logs to given log stream - .route(web::post().to(handlers::event::post_event)) - // DELETE "/logstream/{logstream}" ==> Delete log stream - .route(web::delete().to(handlers::logstream::delete)) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) - .service( - web::resource(alert_path("{logstream}")) - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route(web::put().to(handlers::logstream::put_alert)) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route(web::get().to(handlers::logstream::get_alert)), - ) - // GET "/logstream" ==> Get list of all Log Streams on the server - .service( - web::resource(logstream_path("")).route(web::get().to(handlers::logstream::list)), - ) - .service( - // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource(schema_path("{logstream}")) - .route(web::get().to(handlers::logstream::schema)), - ) - .service( - // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource(stats_path("{logstream}")) - .route(web::get().to(handlers::logstream::get_stats)), - ) - .service( - // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource(retention_path("{logstream}")) - .route(web::put().to(handlers::logstream::put_retention)), - ) - // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command - .service(web::resource(liveness_path()).route(web::get().to(handlers::liveness))) - // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes - .service(web::resource(readiness_path()).route(web::get().to(handlers::readiness))) - .wrap(HttpAuthentication::basic(validator)), - ) - // GET "/" ==> Serve the static frontend directory - .service(ResourceFiles::new("/", generated)); -} - -#[macro_export] -macro_rules! create_app { - ($prometheus: expr) => { - App::new() - .wrap($prometheus.clone()) - .configure(|cfg| configure_routes(cfg)) - .wrap(middleware::Logger::default()) - .wrap(middleware::Compress::default()) - .wrap( - Cors::default() - .allow_any_header() - .allow_any_method() - .allow_any_origin(), - ) - }; -} - -fn base_path() -> String { - format!("{API_BASE_PATH}/{API_VERSION}") -} - -fn logstream_path(stream_name: &str) -> String { - if stream_name.is_empty() { - "/logstream".to_string() - } else { - format!("/logstream/{stream_name}") - } -} - -fn readiness_path() -> String { - "/readiness".to_string() -} - -fn liveness_path() -> String { - "/liveness".to_string() -} - -fn query_path() -> String { - "/query".to_string() -} - -fn ingest_path() -> String { - "/ingest".to_string() -} - -pub fn metrics_path() -> String { - format!("{}/metrics", base_path()) -} - -fn alert_path(stream_name: &str) -> String { - format!("{}/alert", logstream_path(stream_name)) -} - -fn schema_path(stream_name: &str) -> String { - format!("{}/schema", logstream_path(stream_name)) -} - -fn stats_path(stream_name: &str) -> String { - format!("{}/stats", logstream_path(stream_name)) -} - -fn retention_path(stream_name: &str) -> String { - format!("{}/retention", logstream_path(stream_name)) -} diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 2666b831b..820d8f0f9 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -22,7 +22,7 @@ use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use lazy_static::lazy_static; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; -use crate::{metadata::STREAM_INFO, metrics_path}; +use crate::{handlers::http::metrics_path, metadata::STREAM_INFO}; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");