11use arrow_flight:: flight_service_server:: FlightServiceServer ;
2+ use arrow_flight:: PollInfo ;
23use arrow_schema:: ArrowError ;
34use datafusion:: common:: tree_node:: TreeNode ;
45use std:: net:: SocketAddr ;
@@ -15,9 +16,8 @@ use crate::option::{Mode, CONFIG};
1516
1617use crate :: handlers:: livetail:: cross_origin_config;
1718
18- use crate :: handlers:: http:: query:: { into_query, Query as QueryJson } ;
19+ use crate :: handlers:: http:: query:: { authorize_and_set_filter_tags , into_query, Query as QueryJson } ;
1920use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
20- use crate :: rbac:: role:: Permission ;
2121use crate :: storage:: object_storage:: commit_schema_to_storage;
2222use arrow_ipc:: writer:: { DictionaryTracker , IpcDataGenerator , IpcWriteOptions } ;
2323use futures:: stream:: BoxStream ;
@@ -34,7 +34,6 @@ use crate::handlers::livetail::extract_session_key;
3434
3535use crate :: metadata:: STREAM_INFO ;
3636
37- use crate :: rbac:: role:: Action as RoleAction ;
3837use crate :: rbac:: Users ;
3938
4039#[ derive( Clone ) ]
@@ -69,6 +68,13 @@ impl FlightService for AirServiceImpl {
6968 Err ( Status :: unimplemented ( "Implement list_flights" ) )
7069 }
7170
71+ async fn poll_flight_info (
72+ & self ,
73+ _request : Request < FlightDescriptor > ,
74+ ) -> Result < Response < PollInfo > , Status > {
75+ Err ( Status :: unimplemented ( "Implement poll_flight_info" ) )
76+ }
77+
7278 async fn get_flight_info (
7379 & self ,
7480 _request : Request < FlightDescriptor > ,
@@ -117,20 +123,20 @@ impl FlightService for AirServiceImpl {
117123 let mut visitor = TableScanVisitor :: default ( ) ;
118124 let _ = raw_logical_plan. visit ( & mut visitor) ;
119125
120- let table_name = visitor
121- . into_inner ( )
122- . pop ( )
123- . ok_or ( Status :: invalid_argument ( "No table found from sql" ) ) ?;
126+ let tables = visitor. into_inner ( ) ;
124127
125128 if CONFIG . parseable . mode == Mode :: Query {
126129 // using http to get the schema. may update to use flight later
127- if let Ok ( new_schema) = fetch_schema ( & table_name) . await {
128- // commit schema merges the schema internally and updates the schema in storage.
129- commit_schema_to_storage ( & table_name, new_schema. clone ( ) )
130- . await
131- . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
132- commit_schema ( & table_name, Arc :: new ( new_schema) )
133- . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
130+
131+ for table in tables {
132+ if let Ok ( new_schema) = fetch_schema ( & table) . await {
133+ // commit schema merges the schema internally and updates the schema in storage.
134+ commit_schema_to_storage ( & table, new_schema. clone ( ) )
135+ . await
136+ . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
137+ commit_schema ( & table, Arc :: new ( new_schema) )
138+ . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
139+ }
134140 }
135141 }
136142
@@ -141,47 +147,20 @@ impl FlightService for AirServiceImpl {
141147
142148 // if table name is not present it is a Malformed Query
143149 let stream_name = query
144- . table_name ( )
145- . ok_or ( Status :: invalid_argument ( "Malformed Query" ) ) ?;
150+ . first_table_name ( )
151+ . ok_or_else ( || Status :: invalid_argument ( "Malformed Query" ) ) ?;
146152
147153 let permissions = Users . get_permissions ( & key) ;
148154
149- let table_name = query. table_name ( ) ;
150- if let Some ( ref table) = table_name {
151- let mut authorized = false ;
152- let mut tags = Vec :: new ( ) ;
153-
154- // in permission check if user can run query on the stream.
155- // also while iterating add any filter tags for this stream
156- for permission in permissions {
157- match permission {
158- Permission :: Stream ( RoleAction :: All , _) => {
159- authorized = true ;
160- break ;
161- }
162- Permission :: StreamWithTag ( RoleAction :: Query , ref stream, tag)
163- if stream == table || stream == "*" =>
164- {
165- authorized = true ;
166- if let Some ( tag) = tag {
167- tags. push ( tag)
168- }
169- }
170- _ => ( ) ,
171- }
172- }
173-
174- if !authorized {
175- return Err ( Status :: permission_denied ( "User Not Authorized" ) ) ;
176- }
177-
178- if !tags. is_empty ( ) {
179- query. filter_tag = Some ( tags)
180- }
181- }
155+ let table_name = query
156+ . first_table_name ( )
157+ . ok_or_else ( || Status :: invalid_argument ( "Malformed Query" ) ) ?;
158+ authorize_and_set_filter_tags ( & mut query, permissions, & table_name) . map_err ( |_| {
159+ Status :: permission_denied ( "User Does not have permission to access this" )
160+ } ) ?;
182161
183162 let ( results, _) = query
184- . execute ( table_name. clone ( ) . unwrap ( ) )
163+ . execute ( table_name. clone ( ) )
185164 . await
186165 . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
187166 let schema = STREAM_INFO
0 commit comments