Skip to content

Commit aa6927f

Browse files
committed
Merge Changes from stratchpad
1 parent 15605b3 commit aa6927f

File tree

5 files changed

+45
-20
lines changed

5 files changed

+45
-20
lines changed

server/src/handlers/http.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub fn base_path_without_preceding_slash() -> String {
5959
base_path().trim_start_matches('/').to_string()
6060
}
6161

62+
6263
pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
6364
let mut res = vec![];
6465
let ima = QueryServer::get_ingester_info().await.unwrap();

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ impl IngestServer {
175175
)
176176
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
177177
)
178+
.service(
179+
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
180+
web::resource("/schema").route(
181+
web::get()
182+
.to(logstream::schema)
183+
.authorize_for_stream(Action::GetSchema),
184+
),
185+
)
178186
.service(
179187
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
180188
web::resource("/stats").route(

server/src/handlers/http/query.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22-
use arrow_schema::Schema;
22+
use arrow_json::reader::infer_json_schema_from_iterator;
2323
use chrono::{DateTime, Utc};
2424
use datafusion::error::DataFusionError;
2525
use datafusion::execution::context::SessionState;
@@ -30,8 +30,13 @@ use std::pin::Pin;
3030
use std::sync::Arc;
3131
use std::time::Instant;
3232

33-
use crate::event::commit_schema;
33+
// Eshan's Code Under test
34+
#[allow(unused_imports)]
35+
use arrow_schema::Schema;
36+
#[allow(unused_imports)]
3437
use crate::handlers::http::send_schema_request;
38+
39+
use crate::event::commit_schema;
3540
use crate::metrics::QUERY_EXECUTE_TIME;
3641
use crate::option::{Mode, CONFIG};
3742
use crate::query::error::ExecuteError;
@@ -63,28 +68,41 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
6368
let session_state = QUERY_SESSION.state();
6469
let mut query = into_query(&query_request, &session_state).await?;
6570

66-
if CONFIG.parseable.mode == Mode::Query {
67-
if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
68-
let new_schema =
69-
Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;
71+
// if CONFIG.parseable.mode == Mode::Query {
72+
// if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
73+
// let new_schema =
74+
// Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;
7075

71-
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
72-
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
76+
// commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
77+
// .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
7378

74-
commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
75-
.await
76-
.map_err(|err| {
77-
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
78-
})?;
79-
}
80-
}
79+
// commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
80+
// .await
81+
// .map_err(|err| {
82+
// QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
83+
// })?;
84+
// }
85+
// }
8186

8287
let mmem = if CONFIG.parseable.mode == Mode::Query {
8388
// create a new query to send to the ingestors
8489
if let Some(que) = transform_query_for_ingestor(&query_request) {
8590
let vals = send_request_to_ingestor(&que)
8691
.await
8792
.map_err(|err| QueryError::Custom(err.to_string()))?;
93+
let infered_schema = infer_json_schema_from_iterator(vals.iter().map(Ok)).map_err(|err| {
94+
QueryError::Custom(format!("Error inferring schema from iterator\nError:{err}"))
95+
})?;
96+
97+
commit_schema(&query.table_name().unwrap(), Arc::new(infered_schema.clone()))
98+
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
99+
100+
commit_schema_to_storage(&query.table_name().unwrap(), infered_schema)
101+
.await
102+
.map_err(|err| {
103+
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
104+
})?;
105+
88106
Some(vals)
89107
} else {
90108
None

server/src/query.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub static QUERY_SESSION: Lazy<SessionContext> =
5151
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
5252

5353
// A query request by client
54+
#[derive(Debug)]
5455
pub struct Query {
5556
pub raw_logical_plan: LogicalPlan,
5657
pub start: DateTime<Utc>,
@@ -102,10 +103,7 @@ impl Query {
102103
SessionContext::new_with_state(state)
103104
}
104105

105-
pub async fn execute(
106-
&self,
107-
mem: Option<Vec<Value>>,
108-
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
106+
pub async fn execute(&self) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
109107
let df = QUERY_SESSION
110108
.execute_logical_plan(self.final_logical_plan())
111109
.await?;

server/src/response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct QueryResponse {
3030
}
3131

3232
impl QueryResponse {
33-
pub fn to_http(&self) -> impl Responder {
33+
pub fn to_http(&self, imem: Option<Vec<Value>>) -> impl Responder {
3434
log::info!("{}", "Returning query results");
3535
let records: Vec<&RecordBatch> = self.records.iter().collect();
3636
let mut json_records = record_batches_to_json_rows(&records).unwrap();

0 commit comments

Comments
 (0)