@@ -37,6 +37,7 @@ use crate::event::commit_schema;
3737use crate :: metrics:: QUERY_EXECUTE_TIME ;
3838use crate :: option:: { Mode , CONFIG } ;
3939use crate :: query:: error:: ExecuteError ;
40+ use crate :: query:: Query as LogicalQuery ;
4041use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
4142use crate :: rbac:: role:: { Action , Permission } ;
4243use crate :: rbac:: Users ;
@@ -67,69 +68,37 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
6768 let raw_logical_plan = session_state
6869 . create_logical_plan ( & query_request. query )
6970 . await ?;
71+
7072 // create a visitor to extract the table name
7173 let mut visitor = TableScanVisitor :: default ( ) ;
7274 let _ = raw_logical_plan. visit ( & mut visitor) ;
73- let table_name = visitor
74- . into_inner ( )
75- . pop ( )
76- . ok_or ( QueryError :: MalformedQuery (
77- "No table found from sql" . to_string ( ) ,
78- ) ) ?;
75+
76+ let tables = visitor. into_inner ( ) ;
7977
8078 if CONFIG . parseable . mode == Mode :: Query {
81- if let Ok ( new_schema) = fetch_schema ( & table_name) . await {
82- // commit schema merges the schema internally and updates the schema in storage.
83- commit_schema_to_storage ( & table_name, new_schema. clone ( ) )
84- . await
85- . map_err ( QueryError :: ObjectStorage ) ?;
86- commit_schema ( & table_name, Arc :: new ( new_schema) ) . map_err ( QueryError :: EventError ) ?;
79+ for table in tables {
80+ if let Ok ( new_schema) = fetch_schema ( & table) . await {
81+ // commit schema merges the schema internally and updates the schema in storage.
82+ commit_schema_to_storage ( & table, new_schema. clone ( ) )
83+ . await
84+ . map_err ( QueryError :: ObjectStorage ) ?;
85+ commit_schema ( & table, Arc :: new ( new_schema) ) . map_err ( QueryError :: EventError ) ?;
86+ }
8787 }
8888 }
89-
90- let mut query = into_query ( & query_request, & session_state) . await ?;
89+ let mut query: LogicalQuery = into_query ( & query_request, & session_state) . await ?;
9190
9291 let creds = extract_session_key_from_req ( & req) . expect ( "expects basic auth" ) ;
93- let permissions = Users . get_permissions ( & creds) ;
92+ let permissions: Vec < Permission > = Users . get_permissions ( & creds) ;
9493
95- // check authorization of this query if it references physical table;
96- let table_name = query. table_name ( ) ;
97- if let Some ( ref table) = table_name {
98- let mut authorized = false ;
99- let mut tags = Vec :: new ( ) ;
100-
101- // in permission check if user can run query on the stream.
102- // also while iterating add any filter tags for this stream
103- for permission in permissions {
104- match permission {
105- Permission :: Stream ( Action :: All , _) => {
106- authorized = true ;
107- break ;
108- }
109- Permission :: StreamWithTag ( Action :: Query , ref stream, tag)
110- if stream == table || stream == "*" =>
111- {
112- authorized = true ;
113- if let Some ( tag) = tag {
114- tags. push ( tag)
115- }
116- }
117- _ => ( ) ,
118- }
119- }
120-
121- if !authorized {
122- return Err ( QueryError :: Unauthorized ) ;
123- }
124-
125- if !tags. is_empty ( ) {
126- query. filter_tag = Some ( tags)
127- }
128- }
94+ let table_name = query
95+ . first_table_name ( )
96+ . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" . to_string ( ) ) ) ?;
97+ authorize_and_set_filter_tags ( & mut query, permissions, & table_name) ?;
12998
13099 let time = Instant :: now ( ) ;
131100
132- let ( records, fields) = query. execute ( table_name. clone ( ) . unwrap ( ) ) . await ?;
101+ let ( records, fields) = query. execute ( table_name. clone ( ) ) . await ?;
133102 let response = QueryResponse {
134103 records,
135104 fields,
@@ -138,16 +107,55 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
138107 }
139108 . to_http ( ) ;
140109
141- if let Some ( table) = table_name {
142- let time = time. elapsed ( ) . as_secs_f64 ( ) ;
143- QUERY_EXECUTE_TIME
144- . with_label_values ( & [ & table] )
145- . observe ( time) ;
146- }
110+ let time = time. elapsed ( ) . as_secs_f64 ( ) ;
111+
112+ QUERY_EXECUTE_TIME
113+ . with_label_values ( & [ & table_name] )
114+ . observe ( time) ;
147115
148116 Ok ( response)
149117}
150118
119+ fn authorize_and_set_filter_tags (
120+ query : & mut LogicalQuery ,
121+ permissions : Vec < Permission > ,
122+ table_name : & str ,
123+ ) -> Result < ( ) , QueryError > {
124+ // check authorization of this query if it references physical table;
125+ let mut authorized = false ;
126+ let mut tags = Vec :: new ( ) ;
127+
128+ // in permission check if user can run query on the stream.
129+ // also while iterating add any filter tags for this stream
130+ for permission in permissions {
131+ match permission {
132+ Permission :: Stream ( Action :: All , _) => {
133+ authorized = true ;
134+ break ;
135+ }
136+ Permission :: StreamWithTag ( Action :: Query , ref stream, tag)
137+ if stream == table_name || stream == "*" =>
138+ {
139+ authorized = true ;
140+ if let Some ( tag) = tag {
141+ tags. push ( tag)
142+ }
143+ }
144+ _ => ( ) ,
145+ }
146+ }
147+
148+ if !authorized {
149+ return Err ( QueryError :: Unauthorized ) ;
150+ }
151+
152+ if !tags. is_empty ( ) {
153+ query. filter_tag = Some ( tags)
154+ }
155+
156+ Ok ( ( ) )
157+ }
158+
151159impl FromRequest for Query {
152160 type Error = actix_web:: Error ;
153161 type Future = Pin < Box < dyn Future < Output = Result < Self , Self :: Error > > > > ;
@@ -178,7 +186,7 @@ impl FromRequest for Query {
178186async fn into_query (
179187 query : & Query ,
180188 session_state : & SessionState ,
181- ) -> Result < crate :: query :: Query , QueryError > {
189+ ) -> Result < LogicalQuery , QueryError > {
182190 if query. query . is_empty ( ) {
183191 return Err ( QueryError :: EmptyQuery ) ;
184192 }
0 commit comments