Skip to content

Commit d972fcb

Browse files
authored
Make common sql executor (#209)
* Make common sql executor * Make common sql executor
1 parent 2006f0c commit d972fcb

File tree

4 files changed

+84
-62
lines changed

4 files changed

+84
-62
lines changed

bin/icehutd/src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub struct IceHutOpts {
6666

6767
#[arg(long, env="FILE_STORAGE_PATH",
6868
required_if_eq("backend", "file"),
69-
conflicts_with_all(["access_key_id", "secret_access_key", "region", "bucket", "endpoint", "allow_http"]),
69+
conflicts_with_all(["region", "bucket", "endpoint", "allow_http"]),
7070
help_heading="File Backend Options",
7171
help="Path to the directory where files will be stored"
7272
)]

crates/control_plane/src/service.rs

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,18 @@ use arrow_json::writer::JsonArray;
88
use arrow_json::WriterBuilder;
99
use async_trait::async_trait;
1010
use bytes::Bytes;
11-
use datafusion::execution::context::SessionContext;
12-
use datafusion::execution::SessionStateBuilder;
13-
use datafusion::prelude::{CsvReadOptions, SessionConfig};
11+
use datafusion::prelude::CsvReadOptions;
1412
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
15-
use datafusion_iceberg::planner::IcebergQueryPlanner;
1613
use iceberg_rest_catalog::apis::configuration::Configuration;
1714
use iceberg_rest_catalog::catalog::RestCatalog;
1815
use object_store::path::Path;
1916
use object_store::{ObjectStore, PutPayload};
2017
use runtime::datafusion::execution::SqlExecutor;
21-
use runtime::datafusion::type_planner::CustomTypePlanner;
18+
use runtime::datafusion::session::Session;
2219
use rusoto_core::{HttpClient, Region};
2320
use rusoto_credential::StaticProvider;
2421
use rusoto_s3::{GetBucketAclRequest, S3Client, S3};
2522
use snafu::ResultExt;
26-
use std::env;
2723
use std::sync::Arc;
2824
use url::Url;
2925
use uuid::Uuid;
@@ -77,16 +73,21 @@ pub trait ControlService: Send + Sync {
7773
pub struct ControlServiceImpl {
7874
storage_profile_repo: Arc<dyn StorageProfileRepository>,
7975
warehouse_repo: Arc<dyn WarehouseRepository>,
76+
executor: SqlExecutor,
8077
}
8178

8279
impl ControlServiceImpl {
8380
pub fn new(
8481
storage_profile_repo: Arc<dyn StorageProfileRepository>,
8582
warehouse_repo: Arc<dyn WarehouseRepository>,
8683
) -> Self {
84+
let session = Session::default();
85+
#[allow(clippy::unwrap_used)]
86+
let executor = SqlExecutor::new(session.ctx).unwrap();
8787
Self {
8888
storage_profile_repo,
8989
warehouse_repo,
90+
executor,
9091
}
9192
}
9293
}
@@ -203,29 +204,13 @@ impl ControlService for ControlServiceImpl {
203204
#[tracing::instrument(level = "debug", skip(self))]
204205
#[allow(clippy::large_futures)]
205206
async fn query(&self, query: &str) -> ControlPlaneResult<(Vec<RecordBatch>, Vec<ColumnInfo>)> {
206-
let sql_parser_dialect =
207-
env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string());
208-
let state = SessionStateBuilder::new()
209-
.with_config(
210-
SessionConfig::new()
211-
.with_information_schema(true)
212-
.set_str("datafusion.sql_parser.dialect", &sql_parser_dialect),
213-
)
214-
.with_default_features()
215-
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
216-
.with_type_planner(Arc::new(CustomTypePlanner {}))
217-
.build();
218-
let ctx = SessionContext::new_with_state(state);
219-
220-
// TODO: Should be shared context
221-
let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?;
222-
223-
let query = executor.preprocess_query(query);
224-
let statement = executor
207+
let query = self.executor.preprocess_query(query);
208+
let statement = self
209+
.executor
225210
.parse_query(&query)
226211
.context(super::error::DataFusionSnafu)?;
227212

228-
let table_ref = executor.get_table_path(&statement);
213+
let table_ref = self.executor.get_table_path(&statement);
229214
let warehouse_name = table_ref
230215
.as_ref()
231216
.and_then(|table_ref| table_ref.catalog())
@@ -236,7 +221,7 @@ impl ControlService for ControlServiceImpl {
236221
(String::from("datafusion"), String::new())
237222
} else {
238223
let warehouse = self.get_warehouse_by_name(warehouse_name.clone()).await?;
239-
if executor.ctx.catalog(warehouse.name.as_str()).is_none() {
224+
if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() {
240225
let storage_profile = self.get_profile(warehouse.storage_profile_id).await?;
241226

242227
let config = {
@@ -254,24 +239,27 @@ impl ControlService for ControlServiceImpl {
254239
);
255240

256241
let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
257-
executor
258-
.ctx
259-
.register_catalog(warehouse.name.clone(), Arc::new(catalog));
242+
if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() {
243+
self.executor
244+
.ctx
245+
.register_catalog(warehouse.name.clone(), Arc::new(catalog));
246+
}
260247

261248
let object_store = storage_profile
262249
.get_object_store()
263250
.context(crate::error::InvalidStorageProfileSnafu)?;
264251
let endpoint_url = storage_profile
265252
.get_object_store_endpoint_url()
266253
.map_err(|_| ControlPlaneError::MissingStorageEndpointURL)?;
267-
executor
254+
self.executor
268255
.ctx
269256
.register_object_store(&endpoint_url, Arc::from(object_store));
270257
}
271258
(warehouse.name, warehouse.location)
272259
};
273260

274-
let records: Vec<RecordBatch> = executor
261+
let records: Vec<RecordBatch> = self
262+
.executor
275263
.query(&query, catalog_name.as_str(), warehouse_location.as_str())
276264
.await
277265
.context(crate::error::ExecutionSnafu)?
@@ -388,28 +376,26 @@ impl ControlService for ControlServiceImpl {
388376
.await
389377
.context(crate::error::ObjectStoreSnafu)?;
390378

391-
// Create table from CSV
392-
let config = {
393-
let mut config = Configuration::new();
394-
config.base_path = "http://0.0.0.0:3000/catalog".to_string();
395-
config
396-
};
397-
let object_store_builder = storage_profile
398-
.get_object_store_builder()
399-
.context(crate::error::InvalidStorageProfileSnafu)?;
400-
let rest_client = RestCatalog::new(
401-
Some(warehouse_id.to_string().as_str()),
402-
config,
403-
object_store_builder,
404-
);
405-
let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
406-
let state = SessionStateBuilder::new()
407-
.with_default_features()
408-
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
409-
.build();
410-
411-
let ctx = SessionContext::new_with_state(state);
412-
ctx.register_catalog(warehouse_name.clone(), Arc::new(catalog));
379+
if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() {
380+
// Create table from CSV
381+
let config = {
382+
let mut config = Configuration::new();
383+
config.base_path = "http://0.0.0.0:3000/catalog".to_string();
384+
config
385+
};
386+
let object_store_builder = storage_profile
387+
.get_object_store_builder()
388+
.context(crate::error::InvalidStorageProfileSnafu)?;
389+
let rest_client = RestCatalog::new(
390+
Some(warehouse_id.to_string().as_str()),
391+
config,
392+
object_store_builder,
393+
);
394+
let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
395+
self.executor
396+
.ctx
397+
.register_catalog(warehouse.name.clone(), Arc::new(catalog));
398+
}
413399

414400
// Register CSV file as a table
415401
let storage_endpoint_url = storage_profile
@@ -429,15 +415,18 @@ impl ControlService for ControlServiceImpl {
429415
url: storage_endpoint_url,
430416
},
431417
)?;
432-
ctx.register_object_store(&endpoint_url, Arc::from(object_store));
433-
ctx.register_csv(table_name, path_string, CsvReadOptions::new())
418+
self.executor
419+
.ctx
420+
.register_object_store(&endpoint_url, Arc::from(object_store));
421+
self.executor
422+
.ctx
423+
.register_csv(table_name, path_string, CsvReadOptions::new())
434424
.await?;
435425

436426
let insert_query = format!(
437427
"INSERT INTO {warehouse_name}.{database_name}.{table_name} SELECT * FROM {table_name}"
438428
);
439-
let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?;
440-
executor
429+
self.executor
441430
.execute_with_custom_plan(&insert_query, warehouse_name.as_str())
442431
.await
443432
.context(crate::error::ExecutionSnafu)?;
@@ -553,6 +542,7 @@ mod tests {
553542
};
554543
use crate::repository::InMemoryStorageProfileRepository;
555544
use crate::repository::InMemoryWarehouseRepository;
545+
use std::env;
556546

557547
#[tokio::test]
558548
async fn test_create_warehouse_failed_no_storage_profile() {

crates/runtime/src/datafusion/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ pub mod functions;
44
pub mod planner;
55
//pub mod session;
66
pub mod execution;
7+
pub mod session;
78
pub mod type_planner;
Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,36 @@
1+
use crate::datafusion::type_planner::CustomTypePlanner;
2+
use datafusion::execution::SessionStateBuilder;
3+
use datafusion::prelude::{SessionConfig, SessionContext};
4+
use datafusion_iceberg::planner::IcebergQueryPlanner;
5+
use std::env;
6+
use std::sync::Arc;
17

8+
pub struct Session {
9+
pub ctx: SessionContext,
10+
}
211

3-
pub struct IcehutSession {
4-
pub(crate) ctx: SessionContext
5-
}
12+
impl Default for Session {
13+
fn default() -> Self {
14+
Self::new()
15+
}
16+
}
17+
18+
impl Session {
19+
#[must_use]
20+
pub fn new() -> Self {
21+
let sql_parser_dialect =
22+
env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string());
23+
let state = SessionStateBuilder::new()
24+
.with_config(
25+
SessionConfig::new()
26+
.with_information_schema(true)
27+
.set_str("datafusion.sql_parser.dialect", &sql_parser_dialect),
28+
)
29+
.with_default_features()
30+
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
31+
.with_type_planner(Arc::new(CustomTypePlanner {}))
32+
.build();
33+
let ctx = SessionContext::new_with_state(state);
34+
Self { ctx }
35+
}
36+
}

0 commit comments

Comments
 (0)