Skip to content

Commit e4a3430

Browse files
committed
feat: Impl Query Server
1. send_request_ingest_server to return proper result 2. fix: bug if there is no parquet files in store 3. Query Server Now gets all the resultant data
1 parent 9fda908 commit e4a3430

File tree

6 files changed

+125
-38
lines changed

6 files changed

+125
-38
lines changed

server/src/handlers/http.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,51 @@ pub(crate) fn cross_origin_config() -> Cors {
5555
}
5656
}
5757

58-
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
59-
// send the query request to the ingestor
58+
pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
6059
let mut res = vec![];
61-
let ima = QueryServer::get_ingestor_info().await.unwrap();
60+
let ima = QueryServer::get_ingester_info().await.unwrap();
6261

6362
for im in ima {
64-
let uri = format!("{}{}/{}",im.domain_name, base_path(), "query");
63+
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
64+
let reqw = reqwest::Client::new()
65+
.get(uri)
66+
.header(http::header::AUTHORIZATION, im.token.clone())
67+
.header(http::header::CONTENT_TYPE, "application/json")
68+
.send()
69+
.await?;
70+
71+
if reqw.status().is_success() {
72+
let v = serde_json::from_slice(&reqw.bytes().await?)?;
73+
res.push(v);
74+
}
75+
}
76+
77+
Ok(res)
78+
}
79+
80+
pub async fn send_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
81+
// send the query request to the ingestor
82+
let mut res = vec![];
83+
let ima = QueryServer::get_ingester_info().await.unwrap();
84+
85+
for im in ima.iter() {
86+
let uri = format!("{}api/v1/{}", im.domain_name, "query");
87+
6588
let reqw = reqwest::Client::new()
6689
.post(uri)
6790
.json(query)
68-
.basic_auth("admin", Some("admin"))
91+
.header(http::header::AUTHORIZATION, im.token.clone())
92+
.header(http::header::CONTENT_TYPE, "application/json")
6993
.send()
7094
.await?;
7195

7296
if reqw.status().is_success() {
7397
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
74-
res.push(v);
98+
if let Some(arr) = v.as_array() {
99+
for val in arr {
100+
res.push(val.clone())
101+
}
102+
}
75103
}
76104
}
77105

server/src/handlers/http/modal/query_server.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl QueryServer {
158158

159159
let mut f = Self::get_meta_file().await;
160160
// writer the arr in f
161-
let write_size = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
161+
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
162162
Ok(arr)
163163
}
164164

@@ -177,9 +177,11 @@ impl QueryServer {
177177
/// initialize the server, run migrations as needed and start the server
178178
async fn initialize(&self) -> anyhow::Result<()> {
179179
migration::run_metadata_migration(&CONFIG).await?;
180-
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;
181180

182181
let metadata = storage::resolve_parseable_metadata().await?;
182+
// do not commit the below line
183+
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;
184+
183185
banner::print(&CONFIG, &metadata).await;
184186

185187
// initialize the rbac map

server/src/handlers/http/query.rs

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,30 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22-
use chrono::{DateTime, Timelike, Utc};
22+
use arrow_schema::Schema;
23+
use chrono::{DateTime, Utc};
2324
use datafusion::error::DataFusionError;
2425
use datafusion::execution::context::SessionState;
2526
use futures_util::Future;
2627
use http::StatusCode;
2728
use std::collections::HashMap;
2829
use std::pin::Pin;
30+
use std::sync::Arc;
2931
use std::time::Instant;
3032

33+
use crate::event::commit_schema;
34+
use crate::handlers::http::send_schema_request;
3135
use crate::metrics::QUERY_EXECUTE_TIME;
36+
use crate::option::{Mode, CONFIG};
3237
use crate::query::error::ExecuteError;
3338
use crate::query::QUERY_SESSION;
3439
use crate::rbac::role::{Action, Permission};
3540
use crate::rbac::Users;
3641
use crate::response::QueryResponse;
42+
use crate::storage::object_storage::commit_schema_to_storage;
3743
use crate::utils::actix::extract_session_key_from_req;
3844

39-
use super::send_query_request_to_ingestor;
45+
use super::send_request_to_ingestor;
4046

4147
/// Query Request through http endpoint.
4248
#[derive(Debug, serde::Deserialize, serde::Serialize)]
@@ -54,19 +60,41 @@ pub struct Query {
5460
}
5561

5662
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
57-
// create a new query to send to the ingestors
58-
let mut mmem= vec![];
59-
if let Some(query) = transform_query_for_ingestor(&query_request) {
60-
mmem = send_query_request_to_ingestor(&query)
61-
.await
62-
.map_err(|err| QueryError::Custom(err.to_string()))?;
63+
let session_state = QUERY_SESSION.state();
64+
let mut query = into_query(&query_request, &session_state).await?;
65+
66+
if CONFIG.parseable.mode == Mode::Query {
67+
if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
68+
let new_schema =
69+
Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;
70+
71+
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
72+
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
73+
74+
commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
75+
.await
76+
.map_err(|err| {
77+
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
78+
})?;
79+
}
6380
}
6481

65-
// let rbj = arrow_json::ReaderBuilder::new();
82+
let mmem = if CONFIG.parseable.mode == Mode::Query {
83+
// create a new query to send to the ingestors
84+
if let Some(que) = transform_query_for_ingestor(&query_request) {
85+
let vals = send_request_to_ingestor(&que)
86+
.await
87+
.map_err(|err| QueryError::Custom(err.to_string()))?;
88+
Some(vals)
89+
} else {
90+
None
91+
}
92+
} else {
93+
None
94+
};
95+
6696
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
6797
let permissions = Users.get_permissions(&creds);
68-
let session_state = QUERY_SESSION.state();
69-
let mut query = into_query(&query_request, &session_state).await?;
7098

7199
// check authorization of this query if it references physical table;
72100
let table_name = query.table_name();
@@ -105,14 +133,14 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
105133

106134
let time = Instant::now();
107135

108-
let (records, fields) = query.execute(Some(mmem)).await?;
136+
let (records, fields) = query.execute().await?;
109137
let response = QueryResponse {
110138
records,
111139
fields,
112140
fill_null: query_request.send_null,
113141
with_fields: query_request.fields,
114142
}
115-
.to_http();
143+
.to_http(mmem);
116144

117145
if let Some(table) = table_name {
118146
let time = time.elapsed().as_secs_f64();
@@ -195,14 +223,31 @@ async fn into_query(
195223
}
196224

197225
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
198-
let end_time = DateTime::parse_from_rfc3339(&query.end_time).ok()?;
199-
let start_time = end_time - chrono::Duration::minutes(1);
226+
if query.query.is_empty() {
227+
return None;
228+
}
229+
230+
if query.start_time.is_empty() {
231+
return None;
232+
}
200233

201-
dbg!(start_time.minute());
234+
if query.end_time.is_empty() {
235+
return None;
236+
}
237+
238+
let end_time: DateTime<Utc> = if query.end_time == "now" {
239+
Utc::now()
240+
} else {
241+
DateTime::parse_from_rfc3339(&query.end_time)
242+
.ok()?
243+
.with_timezone(&Utc)
244+
};
202245

246+
let start_time = end_time - chrono::Duration::minutes(1);
247+
// when transforming the query, the ingestors are forced to return an array of values
203248
let q = Query {
204249
query: query.query.clone(),
205-
fields: query.fields,
250+
fields: false,
206251
filter_tags: query.filter_tags.clone(),
207252
send_null: query.send_null,
208253
start_time: start_time.to_rfc3339(),
@@ -236,7 +281,7 @@ pub enum QueryError {
236281
Datafusion(#[from] DataFusionError),
237282
#[error("Execution Error: {0}")]
238283
Execute(#[from] ExecuteError),
239-
#[error("Query Error: {0}")]
284+
#[error("Error: {0}")]
240285
Custom(String),
241286
}
242287

server/src/query.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi
3333
use datafusion::prelude::*;
3434
use itertools::Itertools;
3535
use once_cell::sync::Lazy;
36-
use serde_json::Value;
3736
use std::collections::HashMap;
3837
use std::path::{Path, PathBuf};
3938
use std::sync::Arc;
@@ -103,18 +102,14 @@ impl Query {
103102
SessionContext::new_with_state(state)
104103
}
105104

106-
pub async fn execute(&self, mem: Option<Vec<Value>>) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
105+
pub async fn execute(
106+
&self,
107+
mem: Option<Vec<Value>>,
108+
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
107109
let df = QUERY_SESSION
108110
.execute_logical_plan(self.final_logical_plan())
109111
.await?;
110112

111-
let schema = df.schema();
112-
if let Some(mem) = mem {
113-
let mem = arrow_json::ReaderBuilder::new(schema.clone())
114-
.build(mem.iter().map(|x| serde_json::to_string(x).unwrap()).collect());
115-
116-
}
117-
118113
let fields = df
119114
.schema()
120115
.fields()
@@ -123,6 +118,10 @@ impl Query {
123118
.cloned()
124119
.collect_vec();
125120

121+
if fields.is_empty() {
122+
return Ok((vec![], fields));
123+
}
124+
126125
let results = df.collect().await?;
127126
Ok((results, fields))
128127
}

server/src/response.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,12 @@ impl QueryResponse {
4343
}
4444
}
4545
}
46-
let values = json_records.into_iter().map(Value::Object).collect_vec();
46+
let mut values = json_records.into_iter().map(Value::Object).collect_vec();
47+
48+
if let Some(mut imem) = imem {
49+
values.append(&mut imem);
50+
}
51+
4752
let response = if self.with_fields {
4853
json!({
4954
"fields": self.fields,

server/src/storage/object_storage.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ pub trait ObjectStorage: Sync + 'static {
432432
fn get_bucket_name(&self) -> String;
433433
}
434434

435-
async fn commit_schema_to_storage(
435+
pub async fn commit_schema_to_storage(
436436
stream_name: &str,
437437
schema: Schema,
438438
) -> Result<(), ObjectStorageError> {
@@ -451,7 +451,15 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
451451

452452
#[inline(always)]
453453
fn schema_path(stream_name: &str) -> RelativePathBuf {
454-
RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])
454+
match CONFIG.parseable.mode {
455+
Mode::Ingest => {
456+
let (ip, port) = get_address();
457+
let file_name = format!("ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME);
458+
459+
RelativePathBuf::from_iter([stream_name, &file_name])
460+
}
461+
Mode::All | Mode::Query => RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]),
462+
}
455463
}
456464

457465
#[inline(always)]

0 commit comments

Comments
 (0)