Skip to content

Commit 8717a26

Browse files
authored
feat: Impl Query Server Changes (#701)
* feat: Impl Query Server Add: Schema API for Ingestion Server RM: Redundant API Endpoints feat: Last min data is Queryable Add: Unit Tests * fix: func flatten_objects_for_count Fix function flatten_objects_for_count Update Tests * fix: base_path_without_preciding_slah function * fix: Session Context not in sync 1. Session Context was not being synced at the time of schema update 2. Make struct TableScanVisitor pub at crate level
1 parent ec707ab commit 8717a26

File tree

7 files changed

+325
-54
lines changed

7 files changed

+325
-54
lines changed

server/src/handlers/http.rs

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
*/
1818

1919
use actix_cors::Cors;
20+
use arrow_schema::Schema;
21+
use serde_json::Value;
22+
23+
use self::{modal::query_server::QueryServer, query::Query};
2024

2125
pub(crate) mod about;
2226
pub(crate) mod health_check;
@@ -33,11 +37,11 @@ pub(crate) mod rbac;
3337
pub(crate) mod role;
3438

3539
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
36-
pub const API_BASE_PATH: &str = "/api";
40+
pub const API_BASE_PATH: &str = "api";
3741
pub const API_VERSION: &str = "v1";
3842

3943
pub(crate) fn base_path() -> String {
40-
format!("{API_BASE_PATH}/{API_VERSION}")
44+
format!("/{API_BASE_PATH}/{API_VERSION}")
4145
}
4246

4347
pub fn metrics_path() -> String {
@@ -53,5 +57,69 @@ pub(crate) fn cross_origin_config() -> Cors {
5357
}
5458

5559
pub fn base_path_without_preceding_slash() -> String {
56-
base_path().trim_start_matches('/').to_string()
60+
format!("{API_BASE_PATH}/{API_VERSION}")
61+
}
62+
63+
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
64+
let mut res = vec![];
65+
let ima = QueryServer::get_ingester_info().await.unwrap();
66+
67+
for im in ima {
68+
let uri = format!(
69+
"{}{}/logstream/{}/schema",
70+
im.domain_name,
71+
base_path_without_preceding_slash(),
72+
stream_name
73+
);
74+
let reqw = reqwest::Client::new()
75+
.get(uri)
76+
.header(http::header::AUTHORIZATION, im.token.clone())
77+
.header(http::header::CONTENT_TYPE, "application/json")
78+
.send()
79+
.await?;
80+
81+
if reqw.status().is_success() {
82+
let v = serde_json::from_slice(&reqw.bytes().await?)?;
83+
res.push(v);
84+
}
85+
}
86+
87+
let new_schema = Schema::try_merge(res)?;
88+
89+
Ok(new_schema)
90+
}
91+
92+
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
93+
// send the query request to the ingester
94+
let mut res = vec![];
95+
let ima = QueryServer::get_ingester_info().await.unwrap();
96+
97+
for im in ima.iter() {
98+
let uri = format!(
99+
"{}{}/{}",
100+
im.domain_name,
101+
base_path_without_preceding_slash(),
102+
"query"
103+
);
104+
let reqw = reqwest::Client::new()
105+
.post(uri)
106+
.json(query)
107+
.header(http::header::AUTHORIZATION, im.token.clone())
108+
.header(http::header::CONTENT_TYPE, "application/json")
109+
.send()
110+
.await?;
111+
112+
if reqw.status().is_success() {
113+
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
114+
// the value returned is an array of json objects
115+
// so it needs to be flattened
116+
if let Some(arr) = v.as_array() {
117+
for val in arr {
118+
res.push(val.to_owned())
119+
}
120+
}
121+
}
122+
}
123+
124+
Ok(res)
57125
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -126,29 +126,7 @@ impl IngestServer {
126126
.service(Server::get_about_factory()),
127127
)
128128
.service(Server::get_liveness_factory())
129-
.service(Server::get_readiness_factory())
130-
.service(Self::get_metrics_webscope());
131-
}
132-
133-
fn get_metrics_webscope() -> Scope {
134-
web::scope("/logstream").service(
135-
web::scope("/{logstream}")
136-
.service(
137-
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
138-
web::resource("/schema").route(
139-
web::get()
140-
.to(logstream::schema)
141-
.authorize_for_stream(Action::GetSchema),
142-
),
143-
)
144-
.service(
145-
web::resource("/stats").route(
146-
web::get()
147-
.to(logstream::get_stats)
148-
.authorize_for_stream(Action::GetStats),
149-
),
150-
),
151-
)
129+
.service(Server::get_readiness_factory());
152130
}
153131

154132
fn logstream_api() -> Scope {
@@ -176,6 +154,14 @@ impl IngestServer {
176154
)
177155
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
178156
)
157+
.service(
158+
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
159+
web::resource("/schema").route(
160+
web::get()
161+
.to(logstream::schema)
162+
.authorize_for_stream(Action::GetSchema),
163+
),
164+
)
179165
.service(
180166
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
181167
web::resource("/stats").route(

server/src/handlers/http/modal/query_server.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,8 @@ use relative_path::RelativePathBuf;
3535
use reqwest::Response;
3636
use serde::{Deserialize, Serialize};
3737
use std::sync::Arc;
38-
use tokio::io::AsyncWriteExt;
3938
use url::Url;
4039

41-
use tokio::fs::File as TokioFile;
42-
4340
use crate::option::CONFIG;
4441

4542
use super::server::Server;
@@ -170,10 +167,6 @@ impl QueryServer {
170167

171168
// TODO: add validation logic here
172169
// validate the ingester metadata
173-
174-
let mut f = Self::get_meta_file().await;
175-
// writer the arr in f
176-
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
177170
Ok(arr)
178171
}
179172

@@ -224,8 +217,11 @@ impl QueryServer {
224217
/// initialize the server, run migrations as needed and start the server
225218
async fn initialize(&self) -> anyhow::Result<()> {
226219
migration::run_metadata_migration(&CONFIG).await?;
220+
227221
let metadata = storage::resolve_parseable_metadata().await?;
222+
// do not commit the below line
228223
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;
224+
229225
banner::print(&CONFIG, &metadata).await;
230226

231227
// initialize the rbac map
@@ -276,17 +272,6 @@ impl QueryServer {
276272
}
277273
}
278274

279-
async fn get_meta_file() -> TokioFile {
280-
let meta_path = CONFIG.staging_dir().join(".query.json");
281-
282-
tokio::fs::OpenOptions::new()
283-
.read(true)
284-
.write(true)
285-
.open(meta_path)
286-
.await
287-
.unwrap()
288-
}
289-
290275
// forward the request to all ingesters to keep them in sync
291276
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
292277
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
@@ -324,6 +309,7 @@ impl QueryServer {
324309
stream_name
325310
);
326311

312+
// roll back the stream creation
327313
Self::send_stream_rollback_request(&url, ingester.clone()).await?;
328314
}
329315

@@ -337,6 +323,7 @@ impl QueryServer {
337323
Ok(())
338324
}
339325

326+
/// get the cumulative stats from all ingesters
340327
pub async fn fetch_stats_from_ingesters(
341328
stream_name: &str,
342329
) -> Result<QueriedStats, StreamError> {
@@ -391,6 +378,7 @@ impl QueryServer {
391378
Ok(stats)
392379
}
393380

381+
/// send a request to the ingester to fetch its stats
394382
async fn send_stats_request(
395383
url: &str,
396384
ingester: IngesterMetadata,
@@ -488,6 +476,7 @@ impl QueryServer {
488476
Ok(())
489477
}
490478

479+
/// send a rollback request to all ingesters
491480
async fn send_stream_rollback_request(
492481
url: &str,
493482
ingester: IngesterMetadata,
@@ -504,6 +493,7 @@ impl QueryServer {
504493
.send()
505494
.await
506495
.map_err(|err| {
496+
// log the error and return a custom error
507497
log::error!(
508498
"Fatal: failed to rollback stream creation: {}\n Error: {:?}",
509499
ingester.domain_name,
@@ -518,6 +508,8 @@ impl QueryServer {
518508
}
519509
})?;
520510

511+
// if the response is not successful, log the error and return a custom error
512+
// this could be a bit too much, but we need to be sure it covers all cases
521513
if !resp.status().is_success() {
522514
log::error!(
523515
"failed to rollback stream creation: {}\nResponse Returned: {:?}",

server/src/handlers/http/query.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,33 @@ use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
2222
use chrono::{DateTime, Utc};
23+
use datafusion::common::tree_node::TreeNode;
2324
use datafusion::error::DataFusionError;
2425
use datafusion::execution::context::SessionState;
2526
use futures_util::Future;
2627
use http::StatusCode;
2728
use std::collections::HashMap;
2829
use std::pin::Pin;
30+
use std::sync::Arc;
2931
use std::time::Instant;
3032

33+
use crate::handlers::http::fetch_schema;
34+
35+
use crate::event::commit_schema;
3136
use crate::metrics::QUERY_EXECUTE_TIME;
37+
use crate::option::{Mode, CONFIG};
3238
use crate::query::error::ExecuteError;
33-
use crate::query::QUERY_SESSION;
39+
use crate::query::{TableScanVisitor, QUERY_SESSION};
3440
use crate::rbac::role::{Action, Permission};
3541
use crate::rbac::Users;
3642
use crate::response::QueryResponse;
43+
use crate::storage::object_storage::commit_schema_to_storage;
3744
use crate::utils::actix::extract_session_key_from_req;
3845

46+
use super::send_query_request_to_ingester;
47+
3948
/// Query Request through http endpoint.
40-
#[derive(Debug, serde::Deserialize)]
49+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
4150
#[serde(rename_all = "camelCase")]
4251
pub struct Query {
4352
query: String,
@@ -52,11 +61,49 @@ pub struct Query {
5261
}
5362

5463
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
55-
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
56-
let permissions = Users.get_permissions(&creds);
5764
let session_state = QUERY_SESSION.state();
65+
66+
// get the logical plan and extract the table name
67+
let raw_logical_plan = session_state
68+
.create_logical_plan(&query_request.query)
69+
.await?;
70+
// create a visitor to extract the table name
71+
let mut visitor = TableScanVisitor::default();
72+
let _ = raw_logical_plan.visit(&mut visitor);
73+
let table_name = visitor.into_inner().pop().unwrap();
74+
75+
if CONFIG.parseable.mode == Mode::Query {
76+
if let Ok(new_schema) = fetch_schema(&table_name).await {
77+
commit_schema_to_storage(&table_name, new_schema.clone())
78+
.await
79+
.map_err(|err| {
80+
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
81+
})?;
82+
commit_schema(&table_name, Arc::new(new_schema))
83+
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
84+
}
85+
}
86+
5887
let mut query = into_query(&query_request, &session_state).await?;
5988

89+
// ? run this code only if the query start time and now is less than 1 minute + margin
90+
let mmem = if CONFIG.parseable.mode == Mode::Query {
91+
// create a new query to send to the ingesters
92+
if let Some(que) = transform_query_for_ingester(&query_request) {
93+
let vals = send_query_request_to_ingester(&que)
94+
.await
95+
.map_err(|err| QueryError::Custom(err.to_string()))?;
96+
Some(vals)
97+
} else {
98+
None
99+
}
100+
} else {
101+
None
102+
};
103+
104+
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
105+
let permissions = Users.get_permissions(&creds);
106+
60107
// check authorization of this query if it references physical table;
61108
let table_name = query.table_name();
62109
if let Some(ref table) = table_name {
@@ -101,7 +148,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
101148
fill_null: query_request.send_null,
102149
with_fields: query_request.fields,
103150
}
104-
.to_http();
151+
.to_http(mmem);
105152

106153
if let Some(table) = table_name {
107154
let time = time.elapsed().as_secs_f64();
@@ -183,6 +230,41 @@ async fn into_query(
183230
})
184231
}
185232

233+
fn transform_query_for_ingester(query: &Query) -> Option<Query> {
234+
if query.query.is_empty() {
235+
return None;
236+
}
237+
238+
if query.start_time.is_empty() {
239+
return None;
240+
}
241+
242+
if query.end_time.is_empty() {
243+
return None;
244+
}
245+
246+
let end_time: DateTime<Utc> = if query.end_time == "now" {
247+
Utc::now()
248+
} else {
249+
DateTime::parse_from_rfc3339(&query.end_time)
250+
.ok()?
251+
.with_timezone(&Utc)
252+
};
253+
254+
let start_time = end_time - chrono::Duration::minutes(1);
255+
// when transforming the query, the ingesters are forced to return an array of values
256+
let q = Query {
257+
query: query.query.clone(),
258+
fields: false,
259+
filter_tags: query.filter_tags.clone(),
260+
send_null: query.send_null,
261+
start_time: start_time.to_rfc3339(),
262+
end_time: end_time.to_rfc3339(),
263+
};
264+
265+
Some(q)
266+
}
267+
186268
#[derive(Debug, thiserror::Error)]
187269
pub enum QueryError {
188270
#[error("Query cannot be empty")]
@@ -207,6 +289,8 @@ pub enum QueryError {
207289
Datafusion(#[from] DataFusionError),
208290
#[error("Execution Error: {0}")]
209291
Execute(#[from] ExecuteError),
292+
#[error("Error: {0}")]
293+
Custom(String),
210294
}
211295

212296
impl actix_web::ResponseError for QueryError {

0 commit comments

Comments
 (0)