Skip to content

Commit 9fda908

Browse files
committed
Query Server Impl WIP
1 parent b663831 commit 9fda908

File tree

5 files changed

+74
-7
lines changed

5 files changed

+74
-7
lines changed

server/src/handlers/http.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818

1919
use actix_cors::Cors;
20+
use serde_json::Value;
21+
22+
use self::{modal::query_server::QueryServer, query::Query};
2023

2124
pub(crate) mod about;
2225
pub(crate) mod health_check;
@@ -51,3 +54,26 @@ pub(crate) fn cross_origin_config() -> Cors {
5154
Cors::default().block_on_origin_mismatch(false)
5255
}
5356
}
57+
58+
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
59+
// send the query request to the ingestor
60+
let mut res = vec![];
61+
let ima = QueryServer::get_ingestor_info().await.unwrap();
62+
63+
for im in ima {
64+
let uri = format!("{}{}/{}",im.domain_name, base_path(), "query");
65+
let reqw = reqwest::Client::new()
66+
.post(uri)
67+
.json(query)
68+
.basic_auth("admin", Some("admin"))
69+
.send()
70+
.await?;
71+
72+
if reqw.status().is_success() {
73+
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
74+
res.push(v);
75+
}
76+
}
77+
78+
Ok(res)
79+
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl QueryServer {
158158

159159
let mut f = Self::get_meta_file().await;
160160
// writer the arr in f
161-
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
161+
let write_size = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
162162
Ok(arr)
163163
}
164164

@@ -177,8 +177,9 @@ impl QueryServer {
177177
/// initialize the server, run migrations as needed and start the server
178178
async fn initialize(&self) -> anyhow::Result<()> {
179179
migration::run_metadata_migration(&CONFIG).await?;
180-
let metadata = storage::resolve_parseable_metadata().await?;
181180
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;
181+
182+
let metadata = storage::resolve_parseable_metadata().await?;
182183
banner::print(&CONFIG, &metadata).await;
183184

184185
// initialize the rbac map

server/src/handlers/http/query.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22-
use chrono::{DateTime, Utc};
22+
use chrono::{DateTime, Timelike, Utc};
2323
use datafusion::error::DataFusionError;
2424
use datafusion::execution::context::SessionState;
2525
use futures_util::Future;
@@ -36,8 +36,10 @@ use crate::rbac::Users;
3636
use crate::response::QueryResponse;
3737
use crate::utils::actix::extract_session_key_from_req;
3838

39+
use super::send_query_request_to_ingestor;
40+
3941
/// Query Request through http endpoint.
40-
#[derive(Debug, serde::Deserialize)]
42+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
4143
#[serde(rename_all = "camelCase")]
4244
pub struct Query {
4345
query: String,
@@ -52,6 +54,15 @@ pub struct Query {
5254
}
5355

5456
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
57+
// create a new query to send to the ingestors
58+
let mut mmem= vec![];
59+
if let Some(query) = transform_query_for_ingestor(&query_request) {
60+
mmem = send_query_request_to_ingestor(&query)
61+
.await
62+
.map_err(|err| QueryError::Custom(err.to_string()))?;
63+
}
64+
65+
// let rbj = arrow_json::ReaderBuilder::new();
5566
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
5667
let permissions = Users.get_permissions(&creds);
5768
let session_state = QUERY_SESSION.state();
@@ -94,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
94105

95106
let time = Instant::now();
96107

97-
let (records, fields) = query.execute().await?;
108+
let (records, fields) = query.execute(Some(mmem)).await?;
98109
let response = QueryResponse {
99110
records,
100111
fields,
@@ -183,6 +194,24 @@ async fn into_query(
183194
})
184195
}
185196

197+
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
198+
let end_time = DateTime::parse_from_rfc3339(&query.end_time).ok()?;
199+
let start_time = end_time - chrono::Duration::minutes(1);
200+
201+
dbg!(start_time.minute());
202+
203+
let q = Query {
204+
query: query.query.clone(),
205+
fields: query.fields,
206+
filter_tags: query.filter_tags.clone(),
207+
send_null: query.send_null,
208+
start_time: start_time.to_rfc3339(),
209+
end_time: end_time.to_rfc3339(),
210+
};
211+
212+
Some(q)
213+
}
214+
186215
#[derive(Debug, thiserror::Error)]
187216
pub enum QueryError {
188217
#[error("Query cannot be empty")]
@@ -207,6 +236,8 @@ pub enum QueryError {
207236
Datafusion(#[from] DataFusionError),
208237
#[error("Execution Error: {0}")]
209238
Execute(#[from] ExecuteError),
239+
#[error("Query Error: {0}")]
240+
Custom(String),
210241
}
211242

212243
impl actix_web::ResponseError for QueryError {

server/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ async fn main() -> anyhow::Result<()> {
6868
};
6969

7070
// MODE == Query / Ingest and storage = local-store
71-
server.validate()?;
71+
// server.validate()?;
72+
7273
server.init().await?;
7374

7475
Ok(())

server/src/query.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi
3333
use datafusion::prelude::*;
3434
use itertools::Itertools;
3535
use once_cell::sync::Lazy;
36+
use serde_json::Value;
3637
use std::collections::HashMap;
3738
use std::path::{Path, PathBuf};
3839
use std::sync::Arc;
@@ -102,11 +103,18 @@ impl Query {
102103
SessionContext::new_with_state(state)
103104
}
104105

105-
pub async fn execute(&self) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
106+
pub async fn execute(&self, mem: Option<Vec<Value>>) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
106107
let df = QUERY_SESSION
107108
.execute_logical_plan(self.final_logical_plan())
108109
.await?;
109110

111+
let schema = df.schema();
112+
if let Some(mem) = mem {
113+
let mem = arrow_json::ReaderBuilder::new(schema.clone())
114+
.build(mem.iter().map(|x| serde_json::to_string(x).unwrap()).collect());
115+
116+
}
117+
110118
let fields = df
111119
.schema()
112120
.fields()

0 commit comments

Comments
 (0)