Skip to content

Commit eaa53b5

Browse files
committed
Query Server Impl WIP
1 parent 0b9a277 commit eaa53b5

File tree

7 files changed

+82
-26
lines changed

7 files changed

+82
-26
lines changed

server/src/handlers/http.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818

1919
use actix_cors::Cors;
20+
use serde_json::Value;
21+
22+
use self::{modal::query_server::QueryServer, query::Query};
2023

2124
pub(crate) mod about;
2225
pub(crate) mod health_check;
@@ -54,3 +57,26 @@ pub(crate) fn cross_origin_config() -> Cors {
5457
Cors::default().block_on_origin_mismatch(false)
5558
}
5659
}
60+
61+
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
62+
// send the query request to the ingestor
63+
let mut res = vec![];
64+
let ima = QueryServer::get_ingestor_info().await.unwrap();
65+
66+
for im in ima {
67+
let uri = format!("{}{}/{}",im.domain_name, base_path(), "query");
68+
let reqw = reqwest::Client::new()
69+
.post(uri)
70+
.json(query)
71+
.basic_auth("admin", Some("admin"))
72+
.send()
73+
.await?;
74+
75+
if reqw.status().is_success() {
76+
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
77+
res.push(v);
78+
}
79+
}
80+
81+
Ok(res)
82+
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ impl IngestServer {
122122
config
123123
.service(
124124
// Base path "{url}/api/v1"
125-
web::scope(&base_path()).service(Server::get_ingest_factory()),
125+
web::scope(&base_path())
126+
.service(Server::get_query_factory())
127+
.service(Server::get_ingest_factory()),
126128
)
127129
.service(Server::get_liveness_factory())
128130
.service(Server::get_readiness_factory())

server/src/handlers/http/modal/query_server.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use actix_web::web;
2323
use actix_web::web::ServiceConfig;
2424
use actix_web::{App, HttpServer};
2525
use async_trait::async_trait;
26+
use chrono::Utc;
2627
use itertools::Itertools;
2728
use relative_path::RelativePathBuf;
2829
use std::sync::Arc;
@@ -49,11 +50,12 @@ impl ParseableServer for QueryServer {
4950
prometheus: actix_web_prometheus::PrometheusMetrics,
5051
oidc_client: Option<crate::oidc::OpenidConfig>,
5152
) -> anyhow::Result<()> {
53+
5254
let data = Self::get_ingestor_info().await?;
5355

5456
// on subsequent runs, the qurier should check if the ingestor is up and running or not
5557
for ingester in data.iter() {
56-
// dbg!(&ingester);
58+
dbg!(&ingester);
5759
// yes the format macro does not need the '/' ingester.origin already
5860
// has '/' because Url::Parse will add it if it is not present
5961
// uri should be something like `http://address/api/v1/liveness`
@@ -148,7 +150,7 @@ impl QueryServer {
148150
}
149151

150152
// update the .query.json file and return the new IngesterMetadataArr
151-
async fn get_ingestor_info() -> anyhow::Result<IngesterMetadataArr> {
153+
pub async fn get_ingestor_info() -> anyhow::Result<IngesterMetadataArr> {
152154
let store = CONFIG.storage().get_object_store();
153155

154156
let root_path = RelativePathBuf::from("");
@@ -165,7 +167,7 @@ impl QueryServer {
165167

166168
let mut f = Self::get_meta_file().await;
167169
// writer the arr in f
168-
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
170+
let write_size = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
169171
Ok(arr)
170172
}
171173

@@ -182,8 +184,9 @@ impl QueryServer {
182184
/// initialize the server, run migrations as needed and start the server
183185
async fn initialize(&self) -> anyhow::Result<()> {
184186
migration::run_metadata_migration(&CONFIG).await?;
185-
let metadata = storage::resolve_parseable_metadata().await?;
186187
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;
188+
189+
let metadata = storage::resolve_parseable_metadata().await?;
187190
banner::print(&CONFIG, &metadata).await;
188191

189192
// initialize the rbac map
@@ -216,20 +219,19 @@ impl QueryServer {
216219
}
217220

218221
// spawn the sync thread
219-
// tokio::spawn(Self::sync_ingestor_metadata());
222+
//tokio::spawn(Self::sync_ingestor_metadata());
220223

221224
self.start(prometheus, CONFIG.parseable.openid.clone())
222225
.await?;
223226

224227
Ok(())
225228
}
226229

227-
#[allow(dead_code)]
228230
async fn sync_ingestor_metadata() {
229231
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60 / 10));
230232
loop {
231233
interval.tick().await;
232-
// dbg!("Tick");
234+
dbg!("Tick");
233235
Self::get_ingestor_info().await.unwrap();
234236
}
235237
}

server/src/handlers/http/query.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22-
use chrono::{DateTime, Utc};
22+
use chrono::{DateTime, Timelike, Utc};
2323
use datafusion::error::DataFusionError;
2424
use datafusion::execution::context::SessionState;
2525
use futures_util::Future;
@@ -36,8 +36,10 @@ use crate::rbac::Users;
3636
use crate::response::QueryResponse;
3737
use crate::utils::actix::extract_session_key_from_req;
3838

39+
use super::send_query_request_to_ingestor;
40+
3941
/// Query Request through http endpoint.
40-
#[derive(Debug, serde::Deserialize)]
42+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
4143
#[serde(rename_all = "camelCase")]
4244
pub struct Query {
4345
query: String,
@@ -52,6 +54,15 @@ pub struct Query {
5254
}
5355

5456
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
57+
// create a new query to send to the ingestors
58+
let mut mmem= vec![];
59+
if let Some(query) = transform_query_for_ingestor(&query_request) {
60+
mmem = send_query_request_to_ingestor(&query)
61+
.await
62+
.map_err(|err| QueryError::Custom(err.to_string()))?;
63+
}
64+
65+
// let rbj = arrow_json::ReaderBuilder::new();
5566
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
5667
let permissions = Users.get_permissions(&creds);
5768
let session_state = QUERY_SESSION.state();
@@ -94,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
94105

95106
let time = Instant::now();
96107

97-
let (records, fields) = query.execute().await?;
108+
let (records, fields) = query.execute(Some(mmem)).await?;
98109
let response = QueryResponse {
99110
records,
100111
fields,
@@ -183,6 +194,24 @@ async fn into_query(
183194
})
184195
}
185196

197+
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
198+
let end_time = DateTime::parse_from_rfc3339(&query.end_time).ok()?;
199+
let start_time = end_time - chrono::Duration::minutes(1);
200+
201+
dbg!(start_time.minute());
202+
203+
let q = Query {
204+
query: query.query.clone(),
205+
fields: query.fields,
206+
filter_tags: query.filter_tags.clone(),
207+
send_null: query.send_null,
208+
start_time: start_time.to_rfc3339(),
209+
end_time: end_time.to_rfc3339(),
210+
};
211+
212+
Some(q)
213+
}
214+
186215
#[derive(Debug, thiserror::Error)]
187216
pub enum QueryError {
188217
#[error("Query cannot be empty")]
@@ -207,6 +236,8 @@ pub enum QueryError {
207236
Datafusion(#[from] DataFusionError),
208237
#[error("Execution Error: {0}")]
209238
Execute(#[from] ExecuteError),
239+
#[error("Query Error: {0}")]
240+
Custom(String),
210241
}
211242

212243
impl actix_web::ResponseError for QueryError {

server/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ async fn main() -> anyhow::Result<()> {
6969

7070
// add logic for graceful shutdown if
7171
// MODE == Query / Ingest and storage = local-store
72-
// option.rs ln: 161
73-
// CONFIG.run_time_mode_validation()?;
72+
// server.validate()?;
7473

7574
server.init().await?;
7675

server/src/option.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,18 +157,6 @@ impl Config {
157157
}
158158
"S3 bucket"
159159
}
160-
161-
#[allow(dead_code)]
162-
pub fn run_time_mode_validation(&self) -> anyhow::Result<()> {
163-
let check = (self.parseable.mode == Mode::Ingest || self.parseable.mode == Mode::Query)
164-
&& self.storage_name == "drive";
165-
166-
if check {
167-
anyhow::bail!(format!("Cannot start the server in {} mode with local storage, please use S3 bucket for storage", self.parseable.mode.to_str()))
168-
}
169-
170-
Ok(())
171-
}
172160
}
173161

174162
fn create_parseable_cli_command() -> Command {

server/src/query.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi
3333
use datafusion::prelude::*;
3434
use itertools::Itertools;
3535
use once_cell::sync::Lazy;
36+
use serde_json::Value;
3637
use std::collections::HashMap;
3738
use std::path::{Path, PathBuf};
3839
use std::sync::Arc;
@@ -102,11 +103,18 @@ impl Query {
102103
SessionContext::new_with_state(state)
103104
}
104105

105-
pub async fn execute(&self) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
106+
pub async fn execute(&self, mem: Option<Vec<Value>>) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
106107
let df = QUERY_SESSION
107108
.execute_logical_plan(self.final_logical_plan())
108109
.await?;
109110

111+
let schema = df.schema();
112+
if let Some(mem) = mem {
113+
let mem = arrow_json::ReaderBuilder::new(schema.clone())
114+
.build(mem.iter().map(|x| serde_json::to_string(x).unwrap()).collect());
115+
116+
}
117+
110118
let fields = df
111119
.schema()
112120
.fields()

0 commit comments

Comments
 (0)