Skip to content

Commit 3818795

Browse files
committed
add staging query in airplane response(TBT)
1 parent 9797056 commit 3818795

File tree

4 files changed

+96
-18
lines changed

4 files changed

+96
-18
lines changed

server/src/handlers/airplane.rs

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1+
use arrow_array::RecordBatch;
12
use arrow_flight::flight_service_server::FlightServiceServer;
2-
use arrow_flight::PollInfo;
3+
use arrow_flight::{FlightClient, PollInfo};
34
use arrow_schema::ArrowError;
5+
use chrono::Utc;
46
use datafusion::common::tree_node::TreeNode;
7+
use futures::TryStreamExt;
8+
use http::Uri;
9+
use itertools::Itertools;
10+
use serde_json::Value as JsonValue;
511
use std::net::SocketAddr;
612
use std::sync::Arc;
713

814
use futures_util::{Future, TryFutureExt};
915

10-
use tonic::transport::{Identity, Server, ServerTlsConfig};
16+
use tonic::transport::{Channel, Identity, Server, ServerTlsConfig};
1117
use tonic_web::GrpcWebLayer;
1218

1319
use crate::event::commit_schema;
20+
use crate::handlers::http::cluster::get_ingestor_info;
1421
use crate::handlers::http::fetch_schema;
1522
use crate::option::{Mode, CONFIG};
1623

@@ -103,8 +110,27 @@ impl FlightService for AirServiceImpl {
103110

104111
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
105112
let key = extract_session_key(req.metadata())?;
106-
let ticket = serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
107-
.map_err(|err| Status::internal(err.to_string()))?;
113+
114+
let ticket = if CONFIG.parseable.mode == Mode::Ingest {
115+
let query = serde_json::from_slice::<JsonValue>(&req.into_inner().ticket)
116+
.map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"]
117+
.as_str()
118+
.ok_or_else(|| Status::failed_precondition("query is not valid string"))?
119+
.to_owned();
120+
QueryJson {
121+
query,
122+
send_null: false,
123+
fields: false,
124+
filter_tags: None,
125+
// we can use humantime because into_query handle parseing
126+
end_time: String::from("now"),
127+
start_time: String::from("1min"),
128+
}
129+
} else {
130+
serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
131+
.map_err(|err| Status::internal(err.to_string()))?
132+
};
133+
108134
log::info!("airplane requested for query {:?}", ticket);
109135

110136
// get the query session_state
@@ -144,6 +170,49 @@ impl FlightService for AirServiceImpl {
144170
let mut query = into_query(&ticket, &session_state)
145171
.await
146172
.map_err(|_| Status::internal("Failed to parse query"))?;
173+
let time_delta = query.end - Utc::now();
174+
175+
let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2
176+
{
177+
let sql = ticket.query.clone();
178+
let ingester_metadatas = get_ingestor_info()
179+
.await
180+
.map_err(|err| Status::failed_precondition(err.to_string()))?;
181+
let mut minute_result: Vec<RecordBatch> = vec![];
182+
183+
for im in ingester_metadatas.iter() {
184+
let mut url = im.domain_name.rsplit(":").collect_vec();
185+
let _ = url.pop();
186+
url.reverse();
187+
url.push(&im.flight_port);
188+
let url = url.join("");
189+
let url = url
190+
.parse::<Uri>()
191+
.map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?;
192+
193+
let channel = Channel::builder(url)
194+
.connect()
195+
.await
196+
.map_err(|err| Status::failed_precondition(err.to_string()))?;
197+
198+
let mut client = FlightClient::new(channel);
199+
client.add_header("authorization", &im.token)?;
200+
201+
let response = client
202+
.do_get(Ticket {
203+
ticket: sql.clone().into(),
204+
})
205+
.await?;
206+
207+
let mut batches: Vec<RecordBatch> = response.try_collect().await?;
208+
209+
minute_result.append(&mut batches);
210+
}
211+
212+
Some(minute_result)
213+
} else {
214+
None
215+
};
147216

148217
// if table name is not present it is a Malformed Query
149218
let stream_name = query
@@ -152,20 +221,23 @@ impl FlightService for AirServiceImpl {
152221

153222
let permissions = Users.get_permissions(&key);
154223

155-
let table_name = query
156-
.first_table_name()
157-
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;
158-
authorize_and_set_filter_tags(&mut query, permissions, &table_name).map_err(|_| {
224+
authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
159225
Status::permission_denied("User Does not have permission to access this")
160226
})?;
161227

162-
let (results, _) = query
163-
.execute(table_name.clone())
164-
.await
165-
.map_err(|err| Status::internal(err.to_string()))?;
166228
let schema = STREAM_INFO
167229
.schema(&stream_name)
168230
.map_err(|err| Status::failed_precondition(err.to_string()))?;
231+
232+
let (mut results, _) = query
233+
.execute(stream_name)
234+
.await
235+
.map_err(|err| Status::internal(err.to_string()))?;
236+
237+
if let Some(mut minute_result) = minute_result {
238+
results.append(&mut minute_result);
239+
}
240+
169241
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
170242
let schema_flight_data = SchemaAsIpc::new(&schema, &options);
171243

server/src/handlers/http/modal/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub struct IngestorMetadata {
6262
pub bucket_name: String,
6363
pub token: String,
6464
pub ingestor_id: String,
65+
pub flight_port: String,
6566
}
6667

6768
impl IngestorMetadata {
@@ -73,6 +74,7 @@ impl IngestorMetadata {
7374
username: &str,
7475
password: &str,
7576
ingestor_id: String,
77+
flight_port: String,
7678
) -> Self {
7779
let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));
7880

@@ -85,6 +87,7 @@ impl IngestorMetadata {
8587
bucket_name,
8688
token,
8789
ingestor_id,
90+
flight_port,
8891
}
8992
}
9093

@@ -110,9 +113,10 @@ mod test {
110113
"admin",
111114
"admin",
112115
"ingestor_id".to_string(),
116+
"8002".to_string(),
113117
);
114118

115-
let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap();
119+
let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap();
116120

117121
assert_eq!(rhs, lhs);
118122
}
@@ -127,13 +131,14 @@ mod test {
127131
"admin",
128132
"admin",
129133
"ingestor_id".to_string(),
134+
"8002".to_string(),
130135
);
131136

132137
let lhs = serde_json::to_string(&im)
133138
.unwrap()
134139
.try_into_bytes()
135140
.unwrap();
136-
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#
141+
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"#
137142
.try_into_bytes()
138143
.unwrap();
139144

server/src/handlers/http/query.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22-
use chrono::{DateTime, Utc};
22+
use chrono::{DateTime, Duration, Utc};
2323
use datafusion::common::tree_node::TreeNode;
2424
use datafusion::error::DataFusionError;
2525
use datafusion::execution::context::SessionState;
@@ -51,14 +51,14 @@ use crate::utils::actix::extract_session_key_from_req;
5151
#[serde(rename_all = "camelCase")]
5252
pub struct Query {
5353
pub query: String,
54-
start_time: String,
55-
end_time: String,
54+
pub start_time: String,
55+
pub end_time: String,
5656
#[serde(default)]
5757
pub send_null: bool,
5858
#[serde(skip)]
5959
pub fields: bool,
6060
#[serde(skip)]
61-
filter_tags: Option<Vec<String>>,
61+
pub filter_tags: Option<Vec<String>>,
6262
}
6363

6464
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {

server/src/storage/staging.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
366366
&CONFIG.parseable.username,
367367
&CONFIG.parseable.password,
368368
get_ingestor_id(),
369+
CONFIG.parseable.flight_port,
369370
);
370371

371372
put_ingestor_info(out.clone())?;

0 commit comments

Comments
 (0)