Skip to content

Commit a54f2d6

Browse files
authored
Rm staging query result (#740)
* remove staging query from the query result (for distributed) * Refactor get_schema method to handle missing schema in object storage
1 parent d7fcf01 commit a54f2d6

File tree

5 files changed

+32
-34
lines changed

5 files changed

+32
-34
lines changed

server/src/handlers/http.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
9090
Ok(new_schema)
9191
}
9292

93+
/// unused for now, might need it later
94+
#[allow(unused)]
9395
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
9496
// send the query request to the ingester
9597
let mut res = vec![];

server/src/handlers/http/query.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ use crate::storage::object_storage::commit_schema_to_storage;
4545
use crate::storage::ObjectStorageError;
4646
use crate::utils::actix::extract_session_key_from_req;
4747

48-
use super::send_query_request_to_ingester;
49-
5048
/// Query Request through http endpoint.
5149
#[derive(Debug, serde::Deserialize, serde::Serialize)]
5250
#[serde(rename_all = "camelCase")]
@@ -85,21 +83,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8583

8684
let mut query = into_query(&query_request, &session_state).await?;
8785

88-
// ? run this code only if the query start time and now is less than 1 minute + margin
89-
let mmem = if CONFIG.parseable.mode == Mode::Query {
90-
// create a new query to send to the ingesters
91-
if let Some(que) = transform_query_for_ingester(&query_request) {
92-
let vals = send_query_request_to_ingester(&que)
93-
.await
94-
.map_err(|err| QueryError::Custom(err.to_string()))?;
95-
Some(vals)
96-
} else {
97-
None
98-
}
99-
} else {
100-
None
101-
};
102-
10386
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
10487
let permissions = Users.get_permissions(&creds);
10588

@@ -147,7 +130,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
147130
fill_null: query_request.send_null,
148131
with_fields: query_request.fields,
149132
}
150-
.to_http(mmem);
133+
.to_http();
151134

152135
if let Some(table) = table_name {
153136
let time = time.elapsed().as_secs_f64();
@@ -229,6 +212,8 @@ async fn into_query(
229212
})
230213
}
231214

215+
/// unused for now, might need it in the future
216+
#[allow(unused)]
232217
fn transform_query_for_ingester(query: &Query) -> Option<Query> {
233218
if query.query.is_empty() {
234219
return None;
@@ -288,8 +273,6 @@ pub enum QueryError {
288273
Datafusion(#[from] DataFusionError),
289274
#[error("Execution Error: {0}")]
290275
Execute(#[from] ExecuteError),
291-
#[error("Error: {0}")]
292-
Custom(String),
293276
#[error("ObjectStorage Error: {0}")]
294277
ObjectStorage(#[from] ObjectStorageError),
295278
#[error("Evern Error: {0}")]

server/src/query.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
336336
.unwrap()
337337
}
338338

339+
/// unused for now might need it later
340+
#[allow(unused)]
339341
pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
340342
if objects.is_empty() {
341343
return objects;

server/src/response.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ use datafusion::arrow::record_batch::RecordBatch;
2222
use itertools::Itertools;
2323
use serde_json::{json, Value};
2424

25-
use crate::query::flatten_objects_for_count;
26-
2725
pub struct QueryResponse {
2826
pub records: Vec<RecordBatch>,
2927
pub fields: Vec<String>,
@@ -32,7 +30,7 @@ pub struct QueryResponse {
3230
}
3331

3432
impl QueryResponse {
35-
pub fn to_http(&self, imem: Option<Vec<Value>>) -> impl Responder {
33+
pub fn to_http(&self) -> impl Responder {
3634
log::info!("{}", "Returning query results");
3735
let records: Vec<&RecordBatch> = self.records.iter().collect();
3836
let mut json_records = record_batches_to_json_rows(&records).unwrap();
@@ -45,13 +43,7 @@ impl QueryResponse {
4543
}
4644
}
4745
}
48-
let mut values = json_records.into_iter().map(Value::Object).collect_vec();
49-
50-
if let Some(mut imem) = imem {
51-
values.append(&mut imem);
52-
}
53-
54-
let values = flatten_objects_for_count(values);
46+
let values = json_records.into_iter().map(Value::Object).collect_vec();
5547

5648
let response = if self.with_fields {
5749
json!({

server/src/storage/object_storage.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,29 @@ pub trait ObjectStorage: Sync + 'static {
204204
&self,
205205
stream_name: &str,
206206
) -> Result<Schema, ObjectStorageError> {
207-
let schema_path =
208-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
209-
let schema_map = self.get_object(&schema_path).await?;
210-
Ok(serde_json::from_slice(&schema_map)?)
207+
// try get my schema
208+
// if fails get the base schema
209+
// put the schema to storage??
210+
let schema_path = schema_path(stream_name);
211+
let byte_data = match self.get_object(&schema_path).await {
212+
Ok(bytes) => bytes,
213+
Err(err) => {
214+
log::info!("{:?}", err);
215+
// base schema path
216+
let schema_path = RelativePathBuf::from_iter([
217+
stream_name,
218+
STREAM_ROOT_DIRECTORY,
219+
SCHEMA_FILE_NAME,
220+
]);
221+
let data = self.get_object(&schema_path).await?;
222+
// schema was not found in store, so it needs to be placed
223+
self.put_schema(stream_name, &serde_json::from_slice(&data).unwrap())
224+
.await?;
225+
226+
data
227+
}
228+
};
229+
Ok(serde_json::from_slice(&byte_data)?)
211230
}
212231

213232
async fn get_schema(&self, stream_name: &str) -> Result<Schema, ObjectStorageError> {

0 commit comments

Comments
 (0)