Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/icehutd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct IceHutOpts {

#[arg(long, env="FILE_STORAGE_PATH",
required_if_eq("backend", "file"),
conflicts_with_all(["access_key_id", "secret_access_key", "region", "bucket", "endpoint", "allow_http"]),
conflicts_with_all(["region", "bucket", "endpoint", "allow_http"]),
help_heading="File Backend Options",
help="Path to the directory where files will be stored"
)]
Expand Down
106 changes: 48 additions & 58 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,18 @@ use arrow_json::writer::JsonArray;
use arrow_json::WriterBuilder;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{CsvReadOptions, SessionConfig};
use datafusion::prelude::CsvReadOptions;
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
use datafusion_iceberg::planner::IcebergQueryPlanner;
use iceberg_rest_catalog::apis::configuration::Configuration;
use iceberg_rest_catalog::catalog::RestCatalog;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use runtime::datafusion::execution::SqlExecutor;
use runtime::datafusion::type_planner::CustomTypePlanner;
use runtime::datafusion::session::Session;
use rusoto_core::{HttpClient, Region};
use rusoto_credential::StaticProvider;
use rusoto_s3::{GetBucketAclRequest, S3Client, S3};
use snafu::ResultExt;
use std::env;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -77,16 +73,21 @@ pub trait ControlService: Send + Sync {
pub struct ControlServiceImpl {
storage_profile_repo: Arc<dyn StorageProfileRepository>,
warehouse_repo: Arc<dyn WarehouseRepository>,
executor: SqlExecutor,
}

impl ControlServiceImpl {
pub fn new(
storage_profile_repo: Arc<dyn StorageProfileRepository>,
warehouse_repo: Arc<dyn WarehouseRepository>,
) -> Self {
let session = Session::default();
#[allow(clippy::unwrap_used)]
let executor = SqlExecutor::new(session.ctx).unwrap();
Self {
storage_profile_repo,
warehouse_repo,
executor,
}
}
}
Expand Down Expand Up @@ -203,29 +204,13 @@ impl ControlService for ControlServiceImpl {
#[tracing::instrument(level = "debug", skip(self))]
#[allow(clippy::large_futures)]
async fn query(&self, query: &str) -> ControlPlaneResult<(Vec<RecordBatch>, Vec<ColumnInfo>)> {
let sql_parser_dialect =
env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string());
let state = SessionStateBuilder::new()
.with_config(
SessionConfig::new()
.with_information_schema(true)
.set_str("datafusion.sql_parser.dialect", &sql_parser_dialect),
)
.with_default_features()
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
.with_type_planner(Arc::new(CustomTypePlanner {}))
.build();
let ctx = SessionContext::new_with_state(state);

// TODO: Should be shared context
let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?;

let query = executor.preprocess_query(query);
let statement = executor
let query = self.executor.preprocess_query(query);
let statement = self
.executor
.parse_query(&query)
.context(super::error::DataFusionSnafu)?;

let table_ref = executor.get_table_path(&statement);
let table_ref = self.executor.get_table_path(&statement);
let warehouse_name = table_ref
.as_ref()
.and_then(|table_ref| table_ref.catalog())
Expand All @@ -236,7 +221,7 @@ impl ControlService for ControlServiceImpl {
(String::from("datafusion"), String::new())
} else {
let warehouse = self.get_warehouse_by_name(warehouse_name.clone()).await?;
if executor.ctx.catalog(warehouse.name.as_str()).is_none() {
if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() {
let storage_profile = self.get_profile(warehouse.storage_profile_id).await?;

let config = {
Expand All @@ -254,24 +239,27 @@ impl ControlService for ControlServiceImpl {
);

let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
executor
.ctx
.register_catalog(warehouse.name.clone(), Arc::new(catalog));
if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() {
self.executor
.ctx
.register_catalog(warehouse.name.clone(), Arc::new(catalog));
}

let object_store = storage_profile
.get_object_store()
.context(crate::error::InvalidStorageProfileSnafu)?;
let endpoint_url = storage_profile
.get_object_store_endpoint_url()
.map_err(|_| ControlPlaneError::MissingStorageEndpointURL)?;
executor
self.executor
.ctx
.register_object_store(&endpoint_url, Arc::from(object_store));
}
(warehouse.name, warehouse.location)
};

let records: Vec<RecordBatch> = executor
let records: Vec<RecordBatch> = self
.executor
.query(&query, catalog_name.as_str(), warehouse_location.as_str())
.await
.context(crate::error::ExecutionSnafu)?
Expand Down Expand Up @@ -388,28 +376,26 @@ impl ControlService for ControlServiceImpl {
.await
.context(crate::error::ObjectStoreSnafu)?;

// Create table from CSV
let config = {
let mut config = Configuration::new();
config.base_path = "http://0.0.0.0:3000/catalog".to_string();
config
};
let object_store_builder = storage_profile
.get_object_store_builder()
.context(crate::error::InvalidStorageProfileSnafu)?;
let rest_client = RestCatalog::new(
Some(warehouse_id.to_string().as_str()),
config,
object_store_builder,
);
let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
let state = SessionStateBuilder::new()
.with_default_features()
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
.build();

let ctx = SessionContext::new_with_state(state);
ctx.register_catalog(warehouse_name.clone(), Arc::new(catalog));
if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() {
// Create table from CSV
let config = {
let mut config = Configuration::new();
config.base_path = "http://0.0.0.0:3000/catalog".to_string();
config
};
let object_store_builder = storage_profile
.get_object_store_builder()
.context(crate::error::InvalidStorageProfileSnafu)?;
let rest_client = RestCatalog::new(
Some(warehouse_id.to_string().as_str()),
config,
object_store_builder,
);
let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
self.executor
.ctx
.register_catalog(warehouse.name.clone(), Arc::new(catalog));
}

// Register CSV file as a table
let storage_endpoint_url = storage_profile
Expand All @@ -429,15 +415,18 @@ impl ControlService for ControlServiceImpl {
url: storage_endpoint_url,
},
)?;
ctx.register_object_store(&endpoint_url, Arc::from(object_store));
ctx.register_csv(table_name, path_string, CsvReadOptions::new())
self.executor
.ctx
.register_object_store(&endpoint_url, Arc::from(object_store));
self.executor
.ctx
.register_csv(table_name, path_string, CsvReadOptions::new())
.await?;

let insert_query = format!(
"INSERT INTO {warehouse_name}.{database_name}.{table_name} SELECT * FROM {table_name}"
);
let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?;
executor
self.executor
.execute_with_custom_plan(&insert_query, warehouse_name.as_str())
.await
.context(crate::error::ExecutionSnafu)?;
Expand Down Expand Up @@ -553,6 +542,7 @@ mod tests {
};
use crate::repository::InMemoryStorageProfileRepository;
use crate::repository::InMemoryWarehouseRepository;
use std::env;

#[tokio::test]
async fn test_create_warehouse_failed_no_storage_profile() {
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/src/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod functions;
pub mod planner;
//pub mod session;
pub mod execution;
pub mod session;
pub mod type_planner;
37 changes: 34 additions & 3 deletions crates/runtime/src/datafusion/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
use crate::datafusion::type_planner::CustomTypePlanner;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_iceberg::planner::IcebergQueryPlanner;
use std::env;
use std::sync::Arc;

pub struct Session {
pub ctx: SessionContext,
}

pub struct IcehutSession {
pub(crate) ctx: SessionContext
}
impl Default for Session {
fn default() -> Self {
Self::new()
}
}

impl Session {
#[must_use]
pub fn new() -> Self {
let sql_parser_dialect =
env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string());
let state = SessionStateBuilder::new()
.with_config(
SessionConfig::new()
.with_information_schema(true)
.set_str("datafusion.sql_parser.dialect", &sql_parser_dialect),
)
.with_default_features()
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
.with_type_planner(Arc::new(CustomTypePlanner {}))
.build();
let ctx = SessionContext::new_with_state(state);
Self { ctx }
}
}
Loading