Skip to content
24 changes: 14 additions & 10 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use url::Url;

use crate::{
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode}, storage::{AzureBlobConfig, FSConfig, S3Config},
option::{validation, Compression, Mode},
storage::{AzureBlobConfig, FSConfig, S3Config},
};

#[cfg(any(
Expand All @@ -43,7 +44,6 @@ use std::string::String as KafkaSslProtocol;
pub const DEFAULT_USERNAME: &str = "admin";
pub const DEFAULT_PASSWORD: &str = "admin";


#[derive(Parser)]
#[command(
name = "parseable",
Expand Down Expand Up @@ -81,11 +81,11 @@ pub struct Cli {
#[derive(Parser)]
pub enum StorageOptions {
#[command(name = "local-store")]
Local(LocalStoreArgs),
Local(LocalStoreArgs),

#[command(name = "s3-store")]
S3(S3StoreArgs),

#[command(name = "blob-store")]
Blob(BlobStoreArgs),
}
Expand Down Expand Up @@ -125,7 +125,7 @@ pub struct Options {

// Server configuration
#[arg(
long,
long,
env = "P_ADDR",
default_value = "0.0.0.0:8000",
value_parser = validation::socket_addr,
Expand Down Expand Up @@ -381,13 +381,13 @@ pub struct Options {
)]
pub audit_logger: Option<Url>,

#[arg(long ,env = "P_AUDIT_USERNAME", help = "Audit logger username")]
#[arg(long, env = "P_AUDIT_USERNAME", help = "Audit logger username")]
pub audit_username: Option<String>,

#[arg(long ,env = "P_AUDIT_PASSWORD", help = "Audit logger password")]
#[arg(long, env = "P_AUDIT_PASSWORD", help = "Audit logger password")]
pub audit_password: Option<String>,

#[arg(long ,env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
pub ms_clarity_tag: Option<String>,
}

Expand All @@ -405,7 +405,11 @@ impl Options {
}

pub fn openid(&self) -> Option<OpenidConfig> {
match (&self.oidc_client_id, &self.oidc_client_secret, &self.oidc_issuer) {
match (
&self.oidc_client_id,
&self.oidc_client_secret,
&self.oidc_issuer,
) {
(Some(id), Some(secret), Some(issuer)) => {
let origin = if let Some(url) = self.domain_address.clone() {
oidc::Origin::Production(url)
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl ParseableServer for QueryServer {
config
.service(
web::scope(&base_path())
.service(Server::get_date_bin())
.service(Server::get_correlation_webscope())
.service(Server::get_query_factory())
.service(Server::get_liveness_factory())
Expand Down
5 changes: 5 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl ParseableServer for Server {
.service(Self::get_llm_webscope())
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
.service(Self::get_date_bin())
.service(Self::get_metrics_webscope()),
)
.service(Self::get_ingest_otel_factory())
Expand Down Expand Up @@ -266,6 +267,10 @@ impl Server {
),
)
}
pub fn get_date_bin() -> Resource {
web::resource("/datebin")
.route(web::post().to(query::get_date_bin).authorize(Action::Query))
}

// get the query factory
// POST "/query" ==> Get results of the SQL query passed in request body
Expand Down
56 changes: 50 additions & 6 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -39,7 +41,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -52,7 +54,7 @@ use crate::utils::user_auth_for_query;
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
Expand All @@ -66,7 +68,7 @@ pub struct Query {
pub filter_tags: Option<Vec<String>>,
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = match session_state
.create_logical_plan(&query_request.query)
Expand All @@ -81,11 +83,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await?
}
};

let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// create a visitor to extract the table names present in query
// Create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

Expand All @@ -103,6 +104,31 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
user_auth_for_query(&permissions, &tables)?;

let time = Instant::now();
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let date_bin_request = DateBinRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let date_bin_records = date_bin_request.get_bin_density().await?;
let response = if query_request.fields {
json!({
"fields": vec![&column_name],
"records": vec![json!({column_name: date_bin_records[0].log_count})]
})
} else {
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
};

let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

return Ok(HttpResponse::Ok().json(response));
}
let (records, fields) = query.execute(table_name.clone()).await?;

let response = QueryResponse {
Expand All @@ -122,6 +148,24 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
Ok(response)
}

pub async fn get_date_bin(
req: HttpRequest,
date_bin: Json<DateBinRequest>,
) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

// does user have access to table?
user_auth_for_query(&permissions, &[date_bin.stream.clone()])?;

let date_bin_records = date_bin.get_bin_density().await?;

Ok(web::Json(DateBinResponse {
fields: vec!["date_bin_timestamp".into(), "log_count".into()],
records: date_bin_records,
}))
}

pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), QueryError> {
if CONFIG.options.mode == Mode::Query {
for table in tables {
Expand Down
Loading
Loading