Skip to content

Commit 5eb1850

Browse files
committed
impl arrow flight protocol for querying
1 parent b2429b6 commit 5eb1850

File tree

8 files changed

+338
-10
lines changed

8 files changed

+338
-10
lines changed

server/src/cli.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ pub struct Cli {
8989

9090
/// public address for the parseable server ingestor
9191
pub ingestor_endpoint: String,
92+
93+
/// port use by airplane(flight query service)
94+
pub flight_port: u16,
9295
}
9396

9497
impl Cli {
@@ -118,6 +121,7 @@ impl Cli {
118121
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
119122
pub const DEFAULT_USERNAME: &'static str = "admin";
120123
pub const DEFAULT_PASSWORD: &'static str = "admin";
124+
pub const FLIGHT_PORT: &'static str = "flight-port";
121125

122126
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
123127
self.local_staging_path.join(stream_name)
@@ -275,6 +279,16 @@ impl Cli {
275279
.value_parser(value_parser!(u16))
276280
.help("Port for gRPC server"),
277281
)
282+
.arg(
283+
Arg::new(Self::FLIGHT_PORT)
284+
.long(Self::FLIGHT_PORT)
285+
.env("P_FLIGHT_PORT")
286+
.value_name("PORT")
287+
.default_value("8002")
288+
.required(false)
289+
.value_parser(value_parser!(u16))
290+
.help("Port for Arrow Flight Querying Engine"),
291+
)
278292
.arg(
279293
Arg::new(Self::LIVETAIL_CAPACITY)
280294
.long(Self::LIVETAIL_CAPACITY)
@@ -317,11 +331,11 @@ impl Cli {
317331
.help("Mode of operation"),
318332
)
319333
.arg(
320-
Arg::new(Self::INGESTOR_ENDPOINT)
321-
.long(Self::INGESTOR_ENDPOINT)
322-
.env("P_INGESTOR_ENDPOINT")
323-
.value_name("URL")
324-
.required(false)
334+
Arg::new(Self::INGESTOR_ENDPOINT)
335+
.long(Self::INGESTOR_ENDPOINT)
336+
.env("P_INGESTOR_ENDPOINT")
337+
.value_name("URL")
338+
.required(false)
325339
.help("URL to connect to this specific ingestor. Default is the address of the server.")
326340
)
327341
.arg(
@@ -401,6 +415,10 @@ impl FromArgMatches for Cli {
401415
.get_one::<u16>(Self::GRPC_PORT)
402416
.cloned()
403417
.expect("default for livetail port");
418+
self.flight_port = m
419+
.get_one::<u16>(Self::FLIGHT_PORT)
420+
.cloned()
421+
.expect("default for flight port");
404422
self.livetail_channel_capacity = m
405423
.get_one::<usize>(Self::LIVETAIL_CAPACITY)
406424
.cloned()

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*
1717
*/
1818

19+
pub mod airplane;
1920
pub mod http;
2021
pub mod livetail;
2122

server/src/handlers/airplane.rs

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
use arrow_flight::flight_service_server::FlightServiceServer;
2+
use arrow_schema::ArrowError;
3+
use datafusion::common::tree_node::TreeNode;
4+
use std::net::SocketAddr;
5+
use std::sync::Arc;
6+
7+
use futures_util::{Future, TryFutureExt};
8+
9+
use tonic::transport::{Identity, Server, ServerTlsConfig};
10+
use tonic_web::GrpcWebLayer;
11+
12+
use crate::event::commit_schema;
13+
use crate::handlers::http::fetch_schema;
14+
use crate::option::{Mode, CONFIG};
15+
16+
use crate::handlers::livetail::cross_origin_config;
17+
18+
use crate::handlers::http::query::{into_query, Query as QueryJson};
19+
use crate::query::{TableScanVisitor, QUERY_SESSION};
20+
use crate::rbac::role::Permission;
21+
use crate::storage::object_storage::commit_schema_to_storage;
22+
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
23+
use futures::stream::BoxStream;
24+
25+
use tonic::{Request, Response, Status, Streaming};
26+
27+
use arrow_flight::{
28+
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
29+
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
30+
SchemaResult, Ticket,
31+
};
32+
33+
use crate::handlers::livetail::extract_session_key;
34+
35+
use crate::metadata::STREAM_INFO;
36+
37+
use crate::rbac::role::Action as RoleAction;
38+
use crate::rbac::Users;
39+
40+
#[derive(Clone)]
41+
pub struct AirServiceImpl {}
42+
43+
#[tonic::async_trait]
44+
impl FlightService for AirServiceImpl {
45+
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
46+
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
47+
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
48+
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
49+
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
50+
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
51+
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;
52+
53+
async fn handshake(
54+
&self,
55+
_request: Request<Streaming<HandshakeRequest>>,
56+
) -> Result<Response<Self::HandshakeStream>, Status> {
57+
Err(Status::unimplemented(
58+
"handshake is disabled in favour of direct authentication and authorization",
59+
))
60+
}
61+
62+
/// list_flights is an operation that allows a client
63+
/// to query a Flight server for information
64+
/// about available datasets or "flights" that the server can provide.
65+
async fn list_flights(
66+
&self,
67+
_request: Request<Criteria>,
68+
) -> Result<Response<Self::ListFlightsStream>, Status> {
69+
Err(Status::unimplemented("Implement list_flights"))
70+
}
71+
72+
async fn get_flight_info(
73+
&self,
74+
_request: Request<FlightDescriptor>,
75+
) -> Result<Response<FlightInfo>, Status> {
76+
Err(Status::unimplemented("Implement get_flight_info"))
77+
}
78+
79+
async fn get_schema(
80+
&self,
81+
request: Request<FlightDescriptor>,
82+
) -> Result<Response<SchemaResult>, Status> {
83+
let table_name = request.into_inner().path;
84+
let table_name = table_name[0].clone();
85+
86+
let schema = STREAM_INFO
87+
.schema(&table_name)
88+
.map_err(|err| Status::failed_precondition(err.to_string()))?;
89+
90+
let options = IpcWriteOptions::default();
91+
let schema_result = SchemaAsIpc::new(&schema, &options)
92+
.try_into()
93+
.map_err(|err: ArrowError| Status::internal(err.to_string()))?;
94+
95+
Ok(Response::new(schema_result))
96+
}
97+
98+
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
99+
let key = extract_session_key(req.metadata())?;
100+
let ticket = serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
101+
.map_err(|err| Status::internal(err.to_string()))?;
102+
log::info!("airplane requested for query {:?}", ticket);
103+
104+
// get the query session_state
105+
let session_state = QUERY_SESSION.state();
106+
107+
// get the logical plan and extract the table name
108+
let raw_logical_plan = session_state
109+
.create_logical_plan(&ticket.query)
110+
.await
111+
.map_err(|err| {
112+
log::error!("Failed to create logical plan: {}", err);
113+
Status::internal("Failed to create logical plan")
114+
})?;
115+
116+
// create a visitor to extract the table name
117+
let mut visitor = TableScanVisitor::default();
118+
let _ = raw_logical_plan.visit(&mut visitor);
119+
120+
let table_name = visitor
121+
.into_inner()
122+
.pop()
123+
.ok_or(Status::invalid_argument("No table found from sql"))?;
124+
125+
if CONFIG.parseable.mode == Mode::Query {
126+
// using http to get the schema. may update to use flight later
127+
if let Ok(new_schema) = fetch_schema(&table_name).await {
128+
// commit schema merges the schema internally and updates the schema in storage.
129+
commit_schema_to_storage(&table_name, new_schema.clone())
130+
.await
131+
.map_err(|err| Status::internal(err.to_string()))?;
132+
commit_schema(&table_name, Arc::new(new_schema))
133+
.map_err(|err| Status::internal(err.to_string()))?;
134+
}
135+
}
136+
137+
// map payload to query
138+
let mut query = into_query(&ticket, &session_state)
139+
.await
140+
.map_err(|_| Status::internal("Failed to parse query"))?;
141+
142+
// if table name is not present it is a Malformed Query
143+
let stream_name = query
144+
.table_name()
145+
.ok_or(Status::invalid_argument("Malformed Query"))?;
146+
147+
let permissions = Users.get_permissions(&key);
148+
149+
let table_name = query.table_name();
150+
if let Some(ref table) = table_name {
151+
let mut authorized = false;
152+
let mut tags = Vec::new();
153+
154+
// in permission check if user can run query on the stream.
155+
// also while iterating add any filter tags for this stream
156+
for permission in permissions {
157+
match permission {
158+
Permission::Stream(RoleAction::All, _) => {
159+
authorized = true;
160+
break;
161+
}
162+
Permission::StreamWithTag(RoleAction::Query, ref stream, tag)
163+
if stream == table || stream == "*" =>
164+
{
165+
authorized = true;
166+
if let Some(tag) = tag {
167+
tags.push(tag)
168+
}
169+
}
170+
_ => (),
171+
}
172+
}
173+
174+
if !authorized {
175+
return Err(Status::permission_denied("User Not Authorized"));
176+
}
177+
178+
if !tags.is_empty() {
179+
query.filter_tag = Some(tags)
180+
}
181+
}
182+
183+
let (results, _) = query
184+
.execute(table_name.clone().unwrap())
185+
.await
186+
.map_err(|err| Status::internal(err.to_string()))?;
187+
let schema = STREAM_INFO
188+
.schema(&stream_name)
189+
.map_err(|err| Status::failed_precondition(err.to_string()))?;
190+
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
191+
let schema_flight_data = SchemaAsIpc::new(&schema, &options);
192+
193+
let mut flights = vec![FlightData::from(schema_flight_data)];
194+
let encoder = IpcDataGenerator::default();
195+
let mut tracker = DictionaryTracker::new(false);
196+
for batch in &results {
197+
let (flight_dictionaries, flight_batch) = encoder
198+
.encoded_batch(batch, &mut tracker, &options)
199+
.map_err(|e| Status::internal(e.to_string()))?;
200+
flights.extend(flight_dictionaries.into_iter().map(Into::into));
201+
flights.push(flight_batch.into());
202+
}
203+
let output = futures::stream::iter(flights.into_iter().map(Ok));
204+
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
205+
}
206+
207+
async fn do_put(
208+
&self,
209+
_request: Request<Streaming<FlightData>>,
210+
) -> Result<Response<Self::DoPutStream>, Status> {
211+
Err(Status::unimplemented(
212+
"do_put not implemented because we are only using flight for querying",
213+
))
214+
}
215+
216+
async fn do_action(
217+
&self,
218+
_request: Request<Action>,
219+
) -> Result<Response<Self::DoActionStream>, Status> {
220+
Err(Status::unimplemented(
221+
"do_action not implemented because we are only using flight for querying",
222+
))
223+
}
224+
225+
async fn list_actions(
226+
&self,
227+
_request: Request<Empty>,
228+
) -> Result<Response<Self::ListActionsStream>, Status> {
229+
Err(Status::unimplemented(
230+
"list_actions not implemented because we are only using flight for querying",
231+
))
232+
}
233+
234+
async fn do_exchange(
235+
&self,
236+
_request: Request<Streaming<FlightData>>,
237+
) -> Result<Response<Self::DoExchangeStream>, Status> {
238+
Err(Status::unimplemented(
239+
"do_exchange not implemented because we are only using flight for querying",
240+
))
241+
}
242+
}
243+
244+
pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + Send>>> + Send {
245+
let mut addr: SocketAddr = CONFIG
246+
.parseable
247+
.address
248+
.parse()
249+
.expect("valid socket address");
250+
addr.set_port(CONFIG.parseable.flight_port);
251+
252+
let service = AirServiceImpl {};
253+
254+
let svc = FlightServiceServer::new(service);
255+
256+
let cors = cross_origin_config();
257+
258+
let identity = match (
259+
&CONFIG.parseable.tls_cert_path,
260+
&CONFIG.parseable.tls_key_path,
261+
) {
262+
(Some(cert), Some(key)) => {
263+
match (std::fs::read_to_string(cert), std::fs::read_to_string(key)) {
264+
(Ok(cert_file), Ok(key_file)) => {
265+
let identity = Identity::from_pem(cert_file, key_file);
266+
Some(identity)
267+
}
268+
_ => None,
269+
}
270+
}
271+
(_, _) => None,
272+
};
273+
274+
let config = identity.map(|id| ServerTlsConfig::new().identity(id));
275+
276+
// rust is treating closures as different types
277+
let err_map_fn = |err| Box::new(err) as Box<dyn std::error::Error + Send>;
278+
279+
// match on config to decide if we want to use tls or not
280+
match config {
281+
Some(config) => {
282+
let server = match Server::builder().tls_config(config) {
283+
Ok(server) => server,
284+
Err(_) => Server::builder(),
285+
};
286+
287+
server
288+
.accept_http1(true)
289+
.layer(cors)
290+
.layer(GrpcWebLayer::new())
291+
.add_service(svc)
292+
.serve(addr)
293+
.map_err(err_map_fn)
294+
}
295+
None => Server::builder()
296+
.accept_http1(true)
297+
.layer(cors)
298+
.layer(GrpcWebLayer::new())
299+
.add_service(svc)
300+
.serve(addr)
301+
.map_err(err_map_fn),
302+
}
303+
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::analytics;
2020
use crate::banner;
21+
use crate::handlers::airplane;
2122
use crate::handlers::http::logstream;
2223
use crate::handlers::http::middleware::RouteExt;
2324
use crate::localcache::LocalCacheManager;
@@ -336,7 +337,10 @@ impl IngestServer {
336337
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
337338
sync::object_store_sync();
338339

340+
tokio::spawn(airplane::server());
341+
339342
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
343+
340344
tokio::pin!(app);
341345
loop {
342346
tokio::select! {

0 commit comments

Comments
 (0)