Skip to content

Commit d3066fb

Browse files
committed
fix: Session Context not in sync
1. Session Context was not being synced at the time of schema update 2. Make struct TableScanVisitor pub at crate level
1 parent f6f76cf commit d3066fb

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

server/src/handlers/http/query.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
2222
use chrono::{DateTime, Utc};
23+
use datafusion::common::tree_node::TreeNode;
2324
use datafusion::error::DataFusionError;
2425
use datafusion::execution::context::SessionState;
2526
use futures_util::Future;
@@ -35,7 +36,7 @@ use crate::event::commit_schema;
3536
use crate::metrics::QUERY_EXECUTE_TIME;
3637
use crate::option::{Mode, CONFIG};
3738
use crate::query::error::ExecuteError;
38-
use crate::query::QUERY_SESSION;
39+
use crate::query::{TableScanVisitor, QUERY_SESSION};
3940
use crate::rbac::role::{Action, Permission};
4041
use crate::rbac::Users;
4142
use crate::response::QueryResponse;
@@ -61,20 +62,29 @@ pub struct Query {
6162

6263
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
6364
let session_state = QUERY_SESSION.state();
64-
let mut query = into_query(&query_request, &session_state).await?;
65+
66+
// get the logical plan and extract the table name
67+
let raw_logical_plan = session_state
68+
.create_logical_plan(&query_request.query)
69+
.await?;
70+
let mut visitor = TableScanVisitor::default();
71+
let _ = raw_logical_plan.visit(&mut visitor);
72+
let table_name = visitor.into_inner().pop().unwrap();
6573

6674
if CONFIG.parseable.mode == Mode::Query {
67-
if let Ok(new_schema) = fetch_schema(&query.table_name().unwrap()).await {
68-
commit_schema_to_storage(&query.table_name().unwrap(), new_schema.clone())
75+
if let Ok(new_schema) = fetch_schema(&table_name).await {
76+
commit_schema_to_storage(&table_name, new_schema.clone())
6977
.await
7078
.map_err(|err| {
7179
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
7280
})?;
73-
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema))
81+
commit_schema(&table_name, Arc::new(new_schema))
7482
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
7583
}
7684
}
7785

86+
let mut query = into_query(&query_request, &session_state).await?;
87+
7888
// ? run this code only if the query start time and now is less than 1 minute + margin
7989
let mmem = if CONFIG.parseable.mode == Mode::Query {
8090
// create a new query to send to the ingesters

server/src/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,12 @@ impl Query {
162162
}
163163

164164
#[derive(Debug, Default)]
165-
struct TableScanVisitor {
165+
pub(crate) struct TableScanVisitor {
166166
tables: Vec<String>,
167167
}
168168

169169
impl TableScanVisitor {
170-
fn into_inner(self) -> Vec<String> {
170+
pub fn into_inner(self) -> Vec<String> {
171171
self.tables
172172
}
173173
}

0 commit comments

Comments
 (0)