Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/

use actix_cors::Cors;
use arrow_schema::Schema;
use serde_json::Value;

use self::{modal::query_server::QueryServer, query::Query};

pub(crate) mod about;
pub(crate) mod health_check;
Expand All @@ -33,11 +37,11 @@ pub(crate) mod rbac;
pub(crate) mod role;

pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
pub const API_BASE_PATH: &str = "/api";
pub const API_BASE_PATH: &str = "api";
pub const API_VERSION: &str = "v1";

pub(crate) fn base_path() -> String {
format!("{API_BASE_PATH}/{API_VERSION}")
format!("/{API_BASE_PATH}/{API_VERSION}")
}

pub fn metrics_path() -> String {
Expand All @@ -53,5 +57,69 @@ pub(crate) fn cross_origin_config() -> Cors {
}

pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
format!("{API_BASE_PATH}/{API_VERSION}")
}

pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima {
let uri = format!(
"{}{}/logstream/{}/schema",
im.domain_name,
base_path_without_preceding_slash(),
stream_name
);
let reqw = reqwest::Client::new()
.get(uri)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
}
}

let new_schema = Schema::try_merge(res)?;

Ok(new_schema)
}

pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingester
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima.iter() {
let uri = format!(
"{}{}/{}",
im.domain_name,
base_path_without_preceding_slash(),
"query"
);
let reqw = reqwest::Client::new()
.post(uri)
.json(query)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
// the value returned is an array of json objects
// so it needs to be flattened
if let Some(arr) = v.as_array() {
for val in arr {
res.push(val.to_owned())
}
}
}
}

Ok(res)
}
32 changes: 9 additions & 23 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,7 @@ impl IngestServer {
.service(Server::get_about_factory()),
)
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory())
.service(Self::get_metrics_webscope());
}

fn get_metrics_webscope() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}")
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.authorize_for_stream(Action::GetSchema),
),
)
.service(
web::resource("/stats").route(
web::get()
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
),
),
)
.service(Server::get_readiness_factory());
}

fn logstream_api() -> Scope {
Expand Down Expand Up @@ -176,6 +154,14 @@ impl IngestServer {
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
)
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.authorize_for_stream(Action::GetSchema),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
Expand Down
28 changes: 10 additions & 18 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ use relative_path::RelativePathBuf;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use url::Url;

use tokio::fs::File as TokioFile;

use crate::option::CONFIG;

use super::server::Server;
Expand Down Expand Up @@ -170,10 +167,6 @@ impl QueryServer {

// TODO: add validation logic here
// validate the ingester metadata

let mut f = Self::get_meta_file().await;
// writer the arr in f
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
Ok(arr)
}

Expand Down Expand Up @@ -224,8 +217,11 @@ impl QueryServer {
/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;

let metadata = storage::resolve_parseable_metadata().await?;
// do not commit the below line
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;

banner::print(&CONFIG, &metadata).await;

// initialize the rbac map
Expand Down Expand Up @@ -276,17 +272,6 @@ impl QueryServer {
}
}

async fn get_meta_file() -> TokioFile {
let meta_path = CONFIG.staging_dir().join(".query.json");

tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(meta_path)
.await
.unwrap()
}

// forward the request to all ingesters to keep them in sync
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
Expand Down Expand Up @@ -324,6 +309,7 @@ impl QueryServer {
stream_name
);

// roll back the stream creation
Self::send_stream_rollback_request(&url, ingester.clone()).await?;
}

Expand All @@ -337,6 +323,7 @@ impl QueryServer {
Ok(())
}

/// get the cumulative stats from all ingesters
pub async fn fetch_stats_from_ingesters(
stream_name: &str,
) -> Result<QueriedStats, StreamError> {
Expand Down Expand Up @@ -391,6 +378,7 @@ impl QueryServer {
Ok(stats)
}

/// send a request to the ingester to fetch its stats
async fn send_stats_request(
url: &str,
ingester: IngesterMetadata,
Expand Down Expand Up @@ -488,6 +476,7 @@ impl QueryServer {
Ok(())
}

/// send a rollback request to all ingesters
async fn send_stream_rollback_request(
url: &str,
ingester: IngesterMetadata,
Expand All @@ -504,6 +493,7 @@ impl QueryServer {
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to rollback stream creation: {}\n Error: {:?}",
ingester.domain_name,
Expand All @@ -518,6 +508,8 @@ impl QueryServer {
}
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to rollback stream creation: {}\nResponse Returned: {:?}",
Expand Down
94 changes: 89 additions & 5 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,33 @@ use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::StatusCode;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::QUERY_SESSION;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingester;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Query {
query: String,
Expand All @@ -52,11 +61,49 @@ pub struct Query {
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let table_name = visitor.into_inner().pop().unwrap();

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&table_name).await {
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
commit_schema(&table_name, Arc::new(new_schema))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
}
}

let mut query = into_query(&query_request, &session_state).await?;

// ? run this code only if the query start time and now is less than 1 minute + margin
let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingesters
if let Some(que) = transform_query_for_ingester(&query_request) {
let vals = send_query_request_to_ingester(&que)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
Some(vals)
} else {
None
}
} else {
None
};

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);

// check authorization of this query if it references physical table;
let table_name = query.table_name();
if let Some(ref table) = table_name {
Expand Down Expand Up @@ -101,7 +148,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http();
.to_http(mmem);

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -183,6 +230,41 @@ async fn into_query(
})
}

fn transform_query_for_ingester(query: &Query) -> Option<Query> {
if query.query.is_empty() {
return None;
}

if query.start_time.is_empty() {
return None;
}

if query.end_time.is_empty() {
return None;
}

let end_time: DateTime<Utc> = if query.end_time == "now" {
Utc::now()
} else {
DateTime::parse_from_rfc3339(&query.end_time)
.ok()?
.with_timezone(&Utc)
};

let start_time = end_time - chrono::Duration::minutes(1);
// when transforming the query, the ingesters are forced to return an array of values
let q = Query {
query: query.query.clone(),
fields: false,
filter_tags: query.filter_tags.clone(),
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
};

Some(q)
}

#[derive(Debug, thiserror::Error)]
pub enum QueryError {
#[error("Query cannot be empty")]
Expand All @@ -207,6 +289,8 @@ pub enum QueryError {
Datafusion(#[from] DataFusionError),
#[error("Execution Error: {0}")]
Execute(#[from] ExecuteError),
#[error("Error: {0}")]
Custom(String),
}

impl actix_web::ResponseError for QueryError {
Expand Down
Loading