Skip to content

Commit 1255b16

Browse files
committed
fix: schema update where querying
1 parent 09b9e40 commit 1255b16

File tree

1 file changed

+21
-24
lines changed

1 file changed

+21
-24
lines changed

server/src/handlers/http/query.rs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
2222
use chrono::{DateTime, Utc};
23-
use datafusion::common::tree_node::TreeNode;
2423
use datafusion::error::DataFusionError;
2524
use datafusion::execution::context::SessionState;
2625
use futures_util::Future;
@@ -34,10 +33,11 @@ use crate::event::error::EventError;
3433
use crate::handlers::http::fetch_schema;
3534

3635
use crate::event::commit_schema;
36+
use crate::metadata::STREAM_INFO;
3737
use crate::metrics::QUERY_EXECUTE_TIME;
3838
use crate::option::{Mode, CONFIG};
3939
use crate::query::error::ExecuteError;
40-
use crate::query::{TableScanVisitor, QUERY_SESSION};
40+
use crate::query::QUERY_SESSION;
4141
use crate::rbac::role::{Action, Permission};
4242
use crate::rbac::Users;
4343
use crate::response::QueryResponse;
@@ -63,27 +63,26 @@ pub struct Query {
6363
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
6464
let session_state = QUERY_SESSION.state();
6565

66-
// get the logical plan and extract the table name
67-
let raw_logical_plan = session_state
68-
.create_logical_plan(&query_request.query)
69-
.await?;
70-
// create a visitor to extract the table name
71-
let mut visitor = TableScanVisitor::default();
72-
let _ = raw_logical_plan.visit(&mut visitor);
73-
let table_name = visitor
74-
.into_inner()
75-
.pop()
76-
.ok_or(QueryError::MalformedQuery(
77-
"No table found from sql".to_string(),
78-
))?;
79-
8066
if CONFIG.parseable.mode == Mode::Query {
81-
if let Ok(new_schema) = fetch_schema(&table_name).await {
82-
// commit schema merges the schema internally and updates the schema in storage.
83-
commit_schema_to_storage(&table_name, new_schema.clone())
84-
.await
85-
.map_err(QueryError::ObjectStorage)?;
86-
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
67+
for stream in STREAM_INFO.list_streams() {
68+
// figure out how to update the schema of only required streams
69+
match fetch_schema(&stream).await {
70+
Ok(new_schema) => {
71+
// commit schema merges the schema internally and updates the schema in storage.
72+
commit_schema_to_storage(&stream, new_schema.clone())
73+
.await
74+
.map_err(QueryError::ObjectStorage)?;
75+
commit_schema(&stream, Arc::new(new_schema)).map_err(QueryError::EventError)?;
76+
}
77+
Err(err) => {
78+
log::error!(
79+
"Failed to fetch schema for stream: {}, Error: {}",
80+
stream,
81+
err
82+
);
83+
continue;
84+
}
85+
}
8786
}
8887
}
8988

@@ -283,8 +282,6 @@ pub enum QueryError {
283282
ObjectStorage(#[from] ObjectStorageError),
284283
#[error("Evern Error: {0}")]
285284
EventError(#[from] EventError),
286-
#[error("Error: {0}")]
287-
MalformedQuery(String),
288285
}
289286

290287
impl actix_web::ResponseError for QueryError {

0 commit comments

Comments
 (0)