From 83586db48a768173c87c4db5c022b979684c8d6f Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 22 Aug 2024 14:09:25 +0800 Subject: [PATCH 01/10] Initial --- Cargo.toml | 4 + crates/integrations/datafusion/Cargo.toml | 2 +- crates/sqllogictests/Cargo.toml | 25 ++ crates/sqllogictests/src/engine/conversion.rs | 101 +++++++ .../src/engine/datafusion/error.rs | 50 ++++ .../src/engine/datafusion/mod.rs | 26 ++ .../src/engine/datafusion/normalize.rs | 279 ++++++++++++++++++ .../src/engine/datafusion/runner.rs | 87 ++++++ crates/sqllogictests/src/engine/mod.rs | 16 + crates/sqllogictests/src/engine/output.rs | 57 ++++ .../sqllogictests/src/engine/sparksql/mod.rs | 30 ++ crates/sqllogictests/src/lib.rs | 21 ++ crates/sqllogictests/src/schedule.rs | 38 +++ 13 files changed, 735 insertions(+), 1 deletion(-) create mode 100644 crates/sqllogictests/Cargo.toml create mode 100644 crates/sqllogictests/src/engine/conversion.rs create mode 100644 crates/sqllogictests/src/engine/datafusion/error.rs create mode 100644 crates/sqllogictests/src/engine/datafusion/mod.rs create mode 100644 crates/sqllogictests/src/engine/datafusion/normalize.rs create mode 100644 crates/sqllogictests/src/engine/datafusion/runner.rs create mode 100644 crates/sqllogictests/src/engine/mod.rs create mode 100644 crates/sqllogictests/src/engine/output.rs create mode 100644 crates/sqllogictests/src/engine/sparksql/mod.rs create mode 100644 crates/sqllogictests/src/lib.rs create mode 100644 crates/sqllogictests/src/schedule.rs diff --git a/Cargo.toml b/Cargo.toml index 32034f1ab9..4c8722ea1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "crates/iceberg", "crates/integrations/*", "crates/test_utils", + "crates/sqllogictests", ] exclude = ["bindings/python"] @@ -56,6 +57,8 @@ bytes = "1.5" chrono = "0.4.34" ctor = "0.2.8" derive_builder = "0.20" +datafusion = { version = "41.0.0" } +datafusion-common = { version = "41.0.0" } either = "1" env_logger = "0.11.0" fnv = "1" @@ -84,6 +87,7 @@ serde_derive = "1" serde_json = "1" serde_repr = "0.1.16" serde_with = "3.4" +sqlparser = { version = "0.50.0", features = ["visitor"] } tempfile = "3.8" tokio = { version = "1", default-features = false } typed-builder = "0.19" diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 87e809cec0..6a8cf00f92 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -31,7 +31,7 @@ keywords = ["iceberg", "integrations", "datafusion"] [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -datafusion = { version = "41.0.0" } +datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } tokio = { workspace = true } diff --git a/crates/sqllogictests/Cargo.toml b/crates/sqllogictests/Cargo.toml new file mode 100644 index 0000000000..f5667e24db --- /dev/null +++ b/crates/sqllogictests/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "sqllogictests" +version.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +arrow-schema = { workspace = true } +arrow-array= { workspace = true } +async-trait = { workspace = true } +sqllogictest = "0.21.0" +datafusion = { workspace = true, default-features = true} +datafusion-common = { workspace = true, default-features = true} +thiserror = "1.0.63" +sqlparser = {workspace = true} +itertools = "0.13.0" +half = "2.4.1" +bigdecimal = "0.4.1" +rust_decimal = { version = "1.27.0" } +tempfile = { workspace = true } +log = "0.4.22" +tokio = "1.38.0" \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/conversion.rs b/crates/sqllogictests/src/engine/conversion.rs new file mode 100644 index 0000000000..937f43520b --- /dev/null +++ b/crates/sqllogictests/src/engine/conversion.rs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType}; +use bigdecimal::BigDecimal; +use datafusion_common::arrow::datatypes::i256; +use half::f16; +use rust_decimal::prelude::*; + +/// Represents a constant for NULL string in your database. +pub const NULL_STR: &str = "NULL"; + +pub(crate) fn bool_to_str(value: bool) -> String { + if value { + "true".to_string() + } else { + "false".to_string() + } +} + +pub(crate) fn varchar_to_str(value: &str) -> String { + if value.is_empty() { + "(empty)".to_string() + } else { + value.trim_end_matches('\n').to_string() + } +} + +pub(crate) fn f16_to_str(value: f16) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f16::INFINITY { + "Infinity".to_string() + } else if value == f16::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn f32_to_str(value: f32) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f32::INFINITY { + "Infinity".to_string() + } else if value == f32::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn f64_to_str(value: f64) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f64::INFINITY { + "Infinity".to_string() + } else if value == f64::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String { + big_decimal_to_str( + BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)) + .unwrap(), + ) +} + +pub(crate) fn i256_to_str(value: i256, precision: &u8, scale: &i8) -> String { + big_decimal_to_str( + BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)) + .unwrap(), + ) +} + +pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String { + value.round(12).normalized().to_string() +} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/datafusion/error.rs b/crates/sqllogictests/src/engine/datafusion/error.rs new file mode 100644 index 0000000000..ec479516cd --- /dev/null +++ b/crates/sqllogictests/src/engine/datafusion/error.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::ArrowError; +use datafusion_common::DataFusionError; +use sqllogictest::TestError; +use sqlparser::parser::ParserError; +use thiserror::Error; + +pub type Result = std::result::Result; + +/// DataFusion sql-logicaltest error +#[derive(Debug, Error)] +pub enum DFSqlLogicTestError { + /// Error from sqllogictest-rs + #[error("SqlLogicTest error(from sqllogictest-rs crate): {0}")] + SqlLogicTest(#[from] TestError), + /// Error from datafusion + #[error("DataFusion error: {0}")] + DataFusion(#[from] DataFusionError), + /// Error returned when SQL is syntactically incorrect. + #[error("SQL Parser error: {0}")] + Sql(#[from] ParserError), + /// Error from arrow-rs + #[error("Arrow error: {0}")] + Arrow(#[from] ArrowError), + /// Generic error + #[error("Other Error: {0}")] + Other(String), +} + +impl From for DFSqlLogicTestError { + fn from(value: String) -> Self { + DFSqlLogicTestError::Other(value) + } +} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/datafusion/mod.rs b/crates/sqllogictests/src/engine/datafusion/mod.rs new file mode 100644 index 0000000000..da2c218585 --- /dev/null +++ b/crates/sqllogictests/src/engine/datafusion/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +/// DataFusion engine implementation for sqllogictest. + +mod error; +mod normalize; +mod runner; + +pub use error::*; +pub use runner::*; \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/datafusion/normalize.rs b/crates/sqllogictests/src/engine/datafusion/normalize.rs new file mode 100644 index 0000000000..d4e883d21d --- /dev/null +++ b/crates/sqllogictests/src/engine/datafusion/normalize.rs @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::Fields; +use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; +use datafusion_common::DataFusionError; +use std::path::PathBuf; +use std::sync::OnceLock; +use arrow_array::{ArrayRef, RecordBatch}; +use crate::engines::output::DFColumnType; + +use super::super::conversion::*; +use super::error::{DFSqlLogicTestError, Result}; + +/// Converts `batches` to a result as expected by sqllogicteset. +pub(crate) fn convert_batches(batches: Vec) -> Result>> { + if batches.is_empty() { + Ok(vec![]) + } else { + let schema = batches[0].schema(); + let mut rows = vec![]; + for batch in batches { + // Verify schema + if !schema.contains(&batch.schema()) { + return Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal( + format!( + "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", + &schema, + batch.schema() + ), + ))); + } + + let new_rows = convert_batch(batch)? + .into_iter() + .flat_map(expand_row) + .map(normalize_paths); + rows.extend(new_rows); + } + Ok(rows) + } +} + +/// special case rows that have newlines in them (like explain plans) +// +/// Transform inputs like: +/// ```text +/// [ +/// "logical_plan", +/// "Sort: d.b ASC NULLS LAST\n Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +/// +/// Into one cell per line, adding lines if necessary +/// ```text +/// [ +/// "logical_plan", +/// ] +/// [ +/// "Sort: d.b ASC NULLS LAST", +/// ] +/// [ <--- newly added row +/// "|-- Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +fn expand_row(mut row: Vec) -> impl Iterator> { + use itertools::Either; + use std::iter::once; + + // check last cell + if let Some(cell) = row.pop() { + let lines: Vec<_> = cell.split('\n').collect(); + + // no newlines in last cell + if lines.len() < 2 { + row.push(cell); + return Either::Left(once(row)); + } + + // form new rows with each additional line + let new_lines: Vec<_> = lines + .into_iter() + .enumerate() + .map(|(idx, l)| { + // replace any leading spaces with '-' as + // `sqllogictest` ignores whitespace differences + // + // See https://github.com/apache/datafusion/issues/6328 + let content = l.trim_start(); + let new_prefix = "-".repeat(l.len() - content.len()); + // maintain for each line a number, so + // reviewing explain result changes is easier + let line_num = idx + 1; + vec![format!("{line_num:02}){new_prefix}{content}")] + }) + .collect(); + + Either::Right(once(row).chain(new_lines)) + } else { + Either::Left(once(row)) + } +} + +/// normalize path references +/// +/// ```text +/// CsvExec: files={1 group: [[path/to/datafusion/testing/data/csv/aggregate_test_100.csv]]}, ... +/// ``` +/// +/// into: +/// +/// ```text +/// CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, ... +/// ``` +fn normalize_paths(mut row: Vec) -> Vec { + row.iter_mut().for_each(|s| { + let workspace_root: &str = workspace_root().as_ref(); + if s.contains(workspace_root) { + *s = s.replace(workspace_root, "WORKSPACE_ROOT"); + } + }); + row +} + +/// return the location of the datafusion checkout +fn workspace_root() -> &'static object_store::path::Path { + static WORKSPACE_ROOT_LOCK: OnceLock = OnceLock::new(); + WORKSPACE_ROOT_LOCK.get_or_init(|| { + // e.g. /Software/datafusion/datafusion/core + let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + // e.g. /Software/datafusion/datafusion + let workspace_root = dir + .parent() + .expect("Can not find parent of datafusion/core") + // e.g. /Software/datafusion + .parent() + .expect("parent of datafusion") + .to_string_lossy(); + + let sanitized_workplace_root = if cfg!(windows) { + // Object store paths are delimited with `/`, e.g. `/datafusion/datafusion/testing/data/csv/aggregate_test_100.csv`. + // The default windows delimiter is `\`, so the workplace path is `datafusion\datafusion`. + workspace_root + .replace(std::path::MAIN_SEPARATOR, object_store::path::DELIMITER) + } else { + workspace_root.to_string() + }; + + object_store::path::Path::parse(sanitized_workplace_root).unwrap() + }) +} + +/// Convert a single batch to a `Vec>` for comparison +fn convert_batch(batch: RecordBatch) -> Result>> { + (0..batch.num_rows()) + .map(|row| { + batch + .columns() + .iter() + .map(|col| cell_to_string(col, row)) + .collect::>>() + }) + .collect() +} + +macro_rules! get_row_value { + ($array_type:ty, $column: ident, $row: ident) => {{ + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + + array.value($row) + }}; +} + +/// Normalizes the content of a single cell in RecordBatch prior to printing. +/// +/// This is to make the output comparable to the semi-standard .slt format +/// +/// Normalizations applied to [NULL Values and empty strings] +/// +/// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings +/// +/// Floating numbers are rounded to have a consistent representation with the Postgres runner. +/// +pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { + if !col.is_valid(row) { + // represent any null value with the string "NULL" + Ok(NULL_STR.to_string()) + } else { + match col.data_type() { + DataType::Null => Ok(NULL_STR.to_string()), + DataType::Boolean => { + Ok(bool_to_str(get_row_value!(array::BooleanArray, col, row))) + } + DataType::Float16 => { + Ok(f16_to_str(get_row_value!(array::Float16Array, col, row))) + } + DataType::Float32 => { + Ok(f32_to_str(get_row_value!(array::Float32Array, col, row))) + } + DataType::Float64 => { + Ok(f64_to_str(get_row_value!(array::Float64Array, col, row))) + } + DataType::Decimal128(precision, scale) => { + let value = get_row_value!(array::Decimal128Array, col, row); + Ok(i128_to_str(value, precision, scale)) + } + DataType::Decimal256(precision, scale) => { + let value = get_row_value!(array::Decimal256Array, col, row); + Ok(i256_to_str(value, precision, scale)) + } + DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!( + array::LargeStringArray, + col, + row + ))), + DataType::Utf8 => { + Ok(varchar_to_str(get_row_value!(array::StringArray, col, row))) + } + DataType::Utf8View => Ok(varchar_to_str(get_row_value!( + array::StringViewArray, + col, + row + ))), + _ => { + let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); + Ok(f.unwrap().value(row).to_string()) + } + } + .map_err(DFSqlLogicTestError::Arrow) + } +} + +/// Converts columns to a result as expected by sqllogicteset. +pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { + columns + .iter() + .map(|f| f.data_type()) + .map(|data_type| match data_type { + DataType::Boolean => DFColumnType::Boolean, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => DFColumnType::Integer, + DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => DFColumnType::Float, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + DFColumnType::Text + } + DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) => DFColumnType::DateTime, + DataType::Timestamp(_, _) => DFColumnType::Timestamp, + _ => DFColumnType::Another, + }) + .collect() +} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/datafusion/runner.rs b/crates/sqllogictests/src/engine/datafusion/runner.rs new file mode 100644 index 0000000000..e39f62b1be --- /dev/null +++ b/crates/sqllogictests/src/engine/datafusion/runner.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::{path::PathBuf, time::Duration}; +use arrow_array::RecordBatch; +use async_trait::async_trait; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::execute_stream; +use datafusion::prelude::SessionContext; +use log::info; +use sqllogictest::DBOutput; + +use super::{error::Result, normalize, DFSqlLogicTestError}; + +use crate::engine::output::{DFColumnType, DFOutput}; + +pub struct DataFusionEngine { + ctx: SessionContext, + relative_path: PathBuf, +} + +impl DataFusionEngine { + pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self { + Self { ctx, relative_path } + } +} + +#[async_trait] +impl sqllogictest::AsyncDB for DataFusionEngine { + type Error = DFSqlLogicTestError; + type ColumnType = DFColumnType; + + async fn run(&mut self, sql: &str) -> Result { + info!( + "[{}] Running query: \"{}\"", + self.relative_path.display(), + sql + ); + run_query(&self.ctx, sql).await + } + + /// Engine name of current database. + fn engine_name(&self) -> &str { + "DataFusion" + } + + /// [`DataFusionEngine`] calls this function to perform sleep. + /// + /// The default implementation is `std::thread::sleep`, which is universal to any async runtime + /// but would block the current thread. If you are running in tokio runtime, you should override + /// this by `tokio::time::sleep`. + async fn sleep(dur: Duration) { + tokio::time::sleep(dur).await; + } +} + +async fn run_query(ctx: &SessionContext, sql: impl Into) -> Result { + let df = ctx.sql(sql.into().as_str()).await?; + let task_ctx = Arc::new(df.task_ctx()); + let plan = df.create_physical_plan().await?; + + let stream = execute_stream(plan, task_ctx)?; + let types = normalize::convert_schema_to_types(stream.schema().fields()); + let results: Vec = collect(stream).await?; + let rows = normalize::convert_batches(results)?; + + if rows.is_empty() && types.is_empty() { + Ok(DBOutput::StatementComplete(0)) + } else { + Ok(DBOutput::Rows { types, rows }) + } +} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/mod.rs b/crates/sqllogictests/src/engine/mod.rs new file mode 100644 index 0000000000..ba2a1f4151 --- /dev/null +++ b/crates/sqllogictests/src/engine/mod.rs @@ -0,0 +1,16 @@ +mod datafusion; + +use std::sync::Arc; +pub use datafusion::*; + +mod sparksql; +mod conversion; +mod output; + +pub use sparksql::*; + + +pub enum Engine { + DataFusion(Arc), + SparkSQL(Arc), +} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/output.rs b/crates/sqllogictests/src/engine/output.rs new file mode 100644 index 0000000000..ae1030ca4a --- /dev/null +++ b/crates/sqllogictests/src/engine/output.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use sqllogictest::{ColumnType, DBOutput}; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum DFColumnType { + Boolean, + DateTime, + Integer, + Float, + Text, + Timestamp, + Another, +} + +impl ColumnType for DFColumnType { + fn from_char(value: char) -> Option { + match value { + 'B' => Some(Self::Boolean), + 'D' => Some(Self::DateTime), + 'I' => Some(Self::Integer), + 'P' => Some(Self::Timestamp), + 'R' => Some(Self::Float), + 'T' => Some(Self::Text), + _ => Some(Self::Another), + } + } + + fn to_char(&self) -> char { + match self { + Self::Boolean => 'B', + Self::DateTime => 'D', + Self::Integer => 'I', + Self::Timestamp => 'P', + Self::Float => 'R', + Self::Text => 'T', + Self::Another => '?', + } + } +} + +pub(crate) type DFOutput = DBOutput; \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/sparksql/mod.rs b/crates/sqllogictests/src/engine/sparksql/mod.rs new file mode 100644 index 0000000000..aeff2616e8 --- /dev/null +++ b/crates/sqllogictests/src/engine/sparksql/mod.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use sqllogictest::{AsyncDB, DBOutput}; + +/// SparkSql engine implementation for sqllogictest. +pub struct SparkSqlEngine; + +impl AsyncDB for SparkSqlEngine { + type Error = (); + type ColumnType = (); + + async fn run(&mut self, sql: &str) -> Result, Self::Error> { + todo!() + } +} diff --git a/crates/sqllogictests/src/lib.rs b/crates/sqllogictests/src/lib.rs new file mode 100644 index 0000000000..d01802f61d --- /dev/null +++ b/crates/sqllogictests/src/lib.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This lib contains codes copied from +// [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest) +mod engine; +mod schedule; diff --git a/crates/sqllogictests/src/schedule.rs b/crates/sqllogictests/src/schedule.rs new file mode 100644 index 0000000000..26e789bb09 --- /dev/null +++ b/crates/sqllogictests/src/schedule.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::path::Path; +use crate::engine::Engine; + +/// Schedule of engines to run tests. +pub struct Schedule { + /// Map of engine names to engine instances. + engines: HashMap, + /// List of steps to run, each step is a sql file. + steps: Vec, +} + +impl Schedule { + pub async fn parse>(_schedule_def_file: P) -> Self { + todo!() + } + + pub async fn run(mut self) { + todo!() + } +} From 02de788e07ee67597452e2498ed4837f4daac138 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 23 Aug 2024 19:02:34 +0800 Subject: [PATCH 02/10] Initial --- Cargo.toml | 2 +- .../Cargo.toml | 13 +- crates/sqllogictest/bin/sqllogictests.rs | 0 .../src/engine/conversion.rs | 0 .../src/engine/datafusion.rs} | 32 ++--- crates/sqllogictest/src/engine/mod.rs | 79 +++++++++++ .../src/engine}/normalize.rs | 6 +- .../src/engine/output.rs | 0 crates/sqllogictest/src/engine/spark.rs | 77 ++++++++++ .../src/lib.rs | 0 crates/sqllogictest/src/schedule.rs | 131 ++++++++++++++++++ .../sqllogictest/testdata/schedules/demo.toml | 12 ++ .../src/engine/datafusion/error.rs | 50 ------- .../src/engine/datafusion/mod.rs | 26 ---- crates/sqllogictests/src/engine/mod.rs | 16 --- .../sqllogictests/src/engine/sparksql/mod.rs | 30 ---- crates/sqllogictests/src/schedule.rs | 38 ----- 17 files changed, 330 insertions(+), 182 deletions(-) rename crates/{sqllogictests => sqllogictest}/Cargo.toml (76%) create mode 100644 crates/sqllogictest/bin/sqllogictests.rs rename crates/{sqllogictests => sqllogictest}/src/engine/conversion.rs (100%) rename crates/{sqllogictests/src/engine/datafusion/runner.rs => sqllogictest/src/engine/datafusion.rs} (82%) create mode 100644 crates/sqllogictest/src/engine/mod.rs rename crates/{sqllogictests/src/engine/datafusion => sqllogictest/src/engine}/normalize.rs (98%) rename crates/{sqllogictests => sqllogictest}/src/engine/output.rs (100%) create mode 100644 crates/sqllogictest/src/engine/spark.rs rename crates/{sqllogictests => sqllogictest}/src/lib.rs (100%) create mode 100644 crates/sqllogictest/src/schedule.rs create mode 100644 crates/sqllogictest/testdata/schedules/demo.toml delete mode 100644 crates/sqllogictests/src/engine/datafusion/error.rs delete mode 100644 crates/sqllogictests/src/engine/datafusion/mod.rs delete mode 100644 crates/sqllogictests/src/engine/mod.rs delete mode 100644 crates/sqllogictests/src/engine/sparksql/mod.rs delete mode 100644 crates/sqllogictests/src/schedule.rs diff --git a/Cargo.toml b/Cargo.toml index 4c8722ea1d..e3f431e588 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ members = [ "crates/iceberg", "crates/integrations/*", "crates/test_utils", - "crates/sqllogictests", + "crates/sqllogictest", ] exclude = ["bindings/python"] diff --git a/crates/sqllogictests/Cargo.toml b/crates/sqllogictest/Cargo.toml similarity index 76% rename from crates/sqllogictests/Cargo.toml rename to crates/sqllogictest/Cargo.toml index f5667e24db..21388e53de 100644 --- a/crates/sqllogictests/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sqllogictests" +name = "sqllogictest" version.workspace = true edition.workspace = true homepage.workspace = true @@ -22,4 +22,13 @@ bigdecimal = "0.4.1" rust_decimal = { version = "1.27.0" } tempfile = { workspace = true } log = "0.4.22" -tokio = "1.38.0" \ No newline at end of file +tokio = "1.38.0" +spark-connect-rs = "0.0.1-beta.5" +anyhow = {workspace = true} +toml = "0.8.19" +url = {workspace = true} + +[[test]] +harness = false +name = "sqllogictests" +path = "bin/sqllogictests.rs" \ No newline at end of file diff --git a/crates/sqllogictest/bin/sqllogictests.rs b/crates/sqllogictest/bin/sqllogictests.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/crates/sqllogictests/src/engine/conversion.rs b/crates/sqllogictest/src/engine/conversion.rs similarity index 100% rename from crates/sqllogictests/src/engine/conversion.rs rename to crates/sqllogictest/src/engine/conversion.rs diff --git a/crates/sqllogictests/src/engine/datafusion/runner.rs b/crates/sqllogictest/src/engine/datafusion.rs similarity index 82% rename from crates/sqllogictests/src/engine/datafusion/runner.rs rename to crates/sqllogictest/src/engine/datafusion.rs index e39f62b1be..7f881c4c62 100644 --- a/crates/sqllogictests/src/engine/datafusion/runner.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -21,36 +21,36 @@ use arrow_array::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execute_stream; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{SessionConfig, SessionContext}; use log::info; use sqllogictest::DBOutput; -use super::{error::Result, normalize, DFSqlLogicTestError}; - use crate::engine::output::{DFColumnType, DFOutput}; +use crate::engine::normalize; pub struct DataFusionEngine { ctx: SessionContext, - relative_path: PathBuf, } -impl DataFusionEngine { - pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self { - Self { ctx, relative_path } +impl Default for DataFusionEngine { + fn default() -> Self { + let config = SessionConfig::new() + .with_target_partitions(4); + + let ctx = SessionContext::new_with_config(config); + + Self { + ctx + } } } #[async_trait] impl sqllogictest::AsyncDB for DataFusionEngine { - type Error = DFSqlLogicTestError; + type Error = anyhow::Error; type ColumnType = DFColumnType; - async fn run(&mut self, sql: &str) -> Result { - info!( - "[{}] Running query: \"{}\"", - self.relative_path.display(), - sql - ); + async fn run(&mut self, sql: &str) -> anyhow::Result { run_query(&self.ctx, sql).await } @@ -69,7 +69,7 @@ impl sqllogictest::AsyncDB for DataFusionEngine { } } -async fn run_query(ctx: &SessionContext, sql: impl Into) -> Result { +async fn run_query(ctx: &SessionContext, sql: impl Into) -> anyhow::Result { let df = ctx.sql(sql.into().as_str()).await?; let task_ctx = Arc::new(df.task_ctx()); let plan = df.create_physical_plan().await?; @@ -84,4 +84,4 @@ async fn run_query(ctx: &SessionContext, sql: impl Into) -> Result), +} + +impl Engine { + pub async fn new(typ: &str, configs: &Table) -> anyhow::Result { + match typ { + "spark" => { + Ok(Engine::SparkSQL(Arc::new(configs.clone()))) + } + "datafusion" => { + Ok(Engine::DataFusion) + } + other => Err(anyhow!("Unknown engine type: {other}")) + } + } + + pub async fn run_slt_file(self, slt_file: impl Into) -> anyhow::Result<()> { + let absolute_file = format!("{}/testdata/slts/{}", env!("CARGO_MANIFEST_DIR"), slt_file); + + match self { + Engine::DataFusion => { + let runner = Runner::new(|| DataFusionEngine::default()); + Self::run_with_runner(runner, absolute_file).await + } + Engine::SparkSQL(t) => { + let configs = t.clone(); + let runner = Runner::new(async || { + SparkSqlEngine::new(&*configs).await + }); + Self::run_with_runner(runner, absolute_file).await + } + } + } + + async fn run_with_runner(mut runner: Runner, + slt_file: String) -> anyhow::Result<()> { + runner.with_column_validator(strict_column_validator); + Ok(runner + .run_file_async(slt_file) + .await?) + } +} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/datafusion/normalize.rs b/crates/sqllogictest/src/engine/normalize.rs similarity index 98% rename from crates/sqllogictests/src/engine/datafusion/normalize.rs rename to crates/sqllogictest/src/engine/normalize.rs index d4e883d21d..7ca16fa9ab 100644 --- a/crates/sqllogictests/src/engine/datafusion/normalize.rs +++ b/crates/sqllogictest/src/engine/normalize.rs @@ -21,10 +21,10 @@ use datafusion_common::DataFusionError; use std::path::PathBuf; use std::sync::OnceLock; use arrow_array::{ArrayRef, RecordBatch}; -use crate::engines::output::DFColumnType; +use crate::engine::output::DFColumnType; -use super::super::conversion::*; -use super::error::{DFSqlLogicTestError, Result}; +use crate::engine::conversion::*; +use crate::engine::datafusion::error::{DFSqlLogicTestError, Result}; /// Converts `batches` to a result as expected by sqllogicteset. pub(crate) fn convert_batches(batches: Vec) -> Result>> { diff --git a/crates/sqllogictests/src/engine/output.rs b/crates/sqllogictest/src/engine/output.rs similarity index 100% rename from crates/sqllogictests/src/engine/output.rs rename to crates/sqllogictest/src/engine/output.rs diff --git a/crates/sqllogictest/src/engine/spark.rs b/crates/sqllogictest/src/engine/spark.rs new file mode 100644 index 0000000000..d7772fdbff --- /dev/null +++ b/crates/sqllogictest/src/engine/spark.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::engine::output::DFColumnType; +use crate::engine::{normalize, DataFusionEngine}; +use anyhow::anyhow; +use itertools::Itertools; +use spark_connect_rs::{SparkSession, SparkSessionBuilder}; +use sqllogictest::{AsyncDB, DBOutput}; +use std::time::Duration; +use toml::Table; + +/// SparkSql engine implementation for sqllogictest. +pub struct SparkSqlEngine { + session: SparkSession, +} + +impl AsyncDB for SparkSqlEngine { + type Error = anyhow::Error; + type ColumnType = DFColumnType; + + async fn run(&mut self, sql: &str) -> anyhow::Result> { + let results = self.session.sql(sql).await?.collect()?; + let types = normalize::convert_schema_to_types(results.schema().fields()); + let rows = crate::engine::normalize::convert_batches(results)?; + + if rows.is_empty() && types.is_empty() { + Ok(DBOutput::StatementComplete(0)) + } else { + Ok(DBOutput::Rows { types, rows }) + } + } + + /// Engine name of current database. + fn engine_name(&self) -> &str { + "SparkConnect" + } + + /// [`DataFusionEngine`] calls this function to perform sleep. + /// + /// The default implementation is `std::thread::sleep`, which is universal to any async runtime + /// but would block the current thread. If you are running in tokio runtime, you should override + /// this by `tokio::time::sleep`. + async fn sleep(dur: Duration) { + tokio::time::sleep(dur).await; + } +} + +impl SparkSqlEngine { + pub async fn new(configs: &Table) -> anyhow::Result { + let url = configs.get("url") + .ok_or_else(|| anyhow!("url property doesn't exist for spark engine"))? + .as_str() + .ok_or_else(|| anyhow!("url property is not a string for spark engine"))?; + + let session = SparkSessionBuilder::remote(url) + .app_name("SparkConnect") + .build() + .await?; + + Ok(Self { session }) + } +} diff --git a/crates/sqllogictests/src/lib.rs b/crates/sqllogictest/src/lib.rs similarity index 100% rename from crates/sqllogictests/src/lib.rs rename to crates/sqllogictest/src/lib.rs diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs new file mode 100644 index 0000000000..0a148bf222 --- /dev/null +++ b/crates/sqllogictest/src/schedule.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fs::read_to_string; +use std::path::Path; +use anyhow::anyhow; +use itertools::Itertools; +use toml::{Table, Value}; +use toml::value::Array; +use crate::engine::Engine; + +/// Schedule of engines to run tests. +pub struct Schedule { + /// Map of engine names to engine instances. + engines: HashMap, + /// List of steps to run, each step is a sql file. + steps: Vec, +} + +pub struct Step { + /// Name of engine to execute. + engine_name: String, + /// Name of sql file. + sql: String, +} + +impl Schedule { + pub async fn parse>(schedule_def_file: P) -> anyhow::Result { + let content = read_to_string(schedule_def_file)?; + let toml_value = content.parse::()?.as_table() + .ok_or_else(|| anyhow::anyhow!("Schedule file must be a TOML table"))?; + + let engines = Schedule::parse_engines(toml_value).await?; + let steps = Schedule::parse_steps(toml_value).await?; + + Ok(Self { + engines, + steps + }) + } + + async fn parse_engines(table: &Table) -> anyhow::Result> { + let engines = table.get("engines") + .ok_or_else(|| anyhow::anyhow!("Schedule file must have an 'engines' table"))? + .as_table() + .ok_or_else(|| anyhow::anyhow!("'engines' must be a table"))?; + + let mut result = HashMap::new(); + for (name, engine_config) in engines { + let engine_configs = engine_config.as_table() + .ok_or_else(|| anyhow::anyhow!("Config of engine {name} is not a table"))?; + + let typ = engine_configs.get("type") + .ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 'type' field"))? + .as_str() + .ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a string"))?; + + let engine = Engine::build(typ, engine_configs).await?; + + result.insert(name.clone(), engine); + } + + Ok(result) + } + + async fn parse_steps(table: &Table) -> anyhow::Result> { + let steps = table.get("steps") + .ok_or_else(|| anyhow!("steps not found"))? + .as_array() + .ok_or_else(|| anyhow!("steps is not array"))?; + + steps.iter().map(Schedule::parse_step) + .try_collect() + } + + fn parse_step(value: &Value) -> anyhow::Result { + let t = value + .as_table() + .ok_or_else(|| anyhow!("Step must be a table!"))?; + + let engine_name = t.get("engine") + .ok_or_else(|| anyhow!("Property engine is missing in step"))? + .as_str() + .ok_or_else(|| anyhow!("Property engine is not a string in step"))? + .to_string(); + + let sql = t.get("sql") + .ok_or_else(|| anyhow!("Property sql is missing in step"))? + .as_str() + .ok_or_else(|| anyhow!("Property sqlis not a string in step"))? + .to_string(); + + Ok(Step { + engine_name, + sql, + }) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + for step_idx in 0..self.steps.len() { + self.run_step(step_idx).await?; + } + + Ok(()) + } + + async fn run_step(&self, step_index: usize) -> anyhow::Result<()> { + let step = &self.steps[step_index]; + + let engine = self.engines.get(&step.engine_name) + .ok_or_else(|| anyhow!("Engine {} not found!", step.engine_name))? + .clone(); + + engine.run_slt_file(&step.sql).await + } +} diff --git a/crates/sqllogictest/testdata/schedules/demo.toml b/crates/sqllogictest/testdata/schedules/demo.toml new file mode 100644 index 0000000000..1bae195cb9 --- /dev/null +++ b/crates/sqllogictest/testdata/schedules/demo.toml @@ -0,0 +1,12 @@ +[engines] +spark = { type = "spark", url = "sc://localhost:7077" } +df = { type = "datafusion" } + +[[steps]] +engine = "spark" +filename = "prepare1.sql" + +[[steps]] +engine = "df" +filename = "verify.sql" + diff --git a/crates/sqllogictests/src/engine/datafusion/error.rs b/crates/sqllogictests/src/engine/datafusion/error.rs deleted file mode 100644 index ec479516cd..0000000000 --- a/crates/sqllogictests/src/engine/datafusion/error.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow_schema::ArrowError; -use datafusion_common::DataFusionError; -use sqllogictest::TestError; -use sqlparser::parser::ParserError; -use thiserror::Error; - -pub type Result = std::result::Result; - -/// DataFusion sql-logicaltest error -#[derive(Debug, Error)] -pub enum DFSqlLogicTestError { - /// Error from sqllogictest-rs - #[error("SqlLogicTest error(from sqllogictest-rs crate): {0}")] - SqlLogicTest(#[from] TestError), - /// Error from datafusion - #[error("DataFusion error: {0}")] - DataFusion(#[from] DataFusionError), - /// Error returned when SQL is syntactically incorrect. - #[error("SQL Parser error: {0}")] - Sql(#[from] ParserError), - /// Error from arrow-rs - #[error("Arrow error: {0}")] - Arrow(#[from] ArrowError), - /// Generic error - #[error("Other Error: {0}")] - Other(String), -} - -impl From for DFSqlLogicTestError { - fn from(value: String) -> Self { - DFSqlLogicTestError::Other(value) - } -} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/datafusion/mod.rs b/crates/sqllogictests/src/engine/datafusion/mod.rs deleted file mode 100644 index da2c218585..0000000000 --- a/crates/sqllogictests/src/engine/datafusion/mod.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -/// DataFusion engine implementation for sqllogictest. - -mod error; -mod normalize; -mod runner; - -pub use error::*; -pub use runner::*; \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/mod.rs b/crates/sqllogictests/src/engine/mod.rs deleted file mode 100644 index ba2a1f4151..0000000000 --- a/crates/sqllogictests/src/engine/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -mod datafusion; - -use std::sync::Arc; -pub use datafusion::*; - -mod sparksql; -mod conversion; -mod output; - -pub use sparksql::*; - - -pub enum Engine { - DataFusion(Arc), - SparkSQL(Arc), -} \ No newline at end of file diff --git a/crates/sqllogictests/src/engine/sparksql/mod.rs b/crates/sqllogictests/src/engine/sparksql/mod.rs deleted file mode 100644 index aeff2616e8..0000000000 --- a/crates/sqllogictests/src/engine/sparksql/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use sqllogictest::{AsyncDB, DBOutput}; - -/// SparkSql engine implementation for sqllogictest. -pub struct SparkSqlEngine; - -impl AsyncDB for SparkSqlEngine { - type Error = (); - type ColumnType = (); - - async fn run(&mut self, sql: &str) -> Result, Self::Error> { - todo!() - } -} diff --git a/crates/sqllogictests/src/schedule.rs b/crates/sqllogictests/src/schedule.rs deleted file mode 100644 index 26e789bb09..0000000000 --- a/crates/sqllogictests/src/schedule.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; -use std::path::Path; -use crate::engine::Engine; - -/// Schedule of engines to run tests. -pub struct Schedule { - /// Map of engine names to engine instances. - engines: HashMap, - /// List of steps to run, each step is a sql file. - steps: Vec, -} - -impl Schedule { - pub async fn parse>(_schedule_def_file: P) -> Self { - todo!() - } - - pub async fn run(mut self) { - todo!() - } -} From 434214a2ca76462c161b902fd9ab8928ffc936fc Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Sat, 24 Aug 2024 18:42:50 +0800 Subject: [PATCH 03/10] Code complete --- crates/sqllogictest/Cargo.toml | 12 ++- crates/sqllogictest/bin/sqllogictests.rs | 0 crates/sqllogictest/src/engine/datafusion.rs | 40 +++++++-- crates/sqllogictest/src/engine/mod.rs | 16 ++-- crates/sqllogictest/src/lib.rs | 2 +- .../testdata/docker/docker-compose.yaml | 86 +++++++++++++++++++ .../docker/spark/spark-connect-server.sh | 27 ++++++ .../sqllogictest/testdata/schedules/demo.toml | 4 +- .../testdata/slts/demo/prepare.slt | 11 +++ .../testdata/slts/demo/verify.slt | 6 ++ crates/sqllogictest/tests/sqllogictests.rs | 78 +++++++++++++++++ 11 files changed, 265 insertions(+), 17 deletions(-) delete mode 100644 crates/sqllogictest/bin/sqllogictests.rs create mode 100644 crates/sqllogictest/testdata/docker/docker-compose.yaml create mode 100755 crates/sqllogictest/testdata/docker/spark/spark-connect-server.sh create mode 100644 crates/sqllogictest/testdata/slts/demo/prepare.slt create mode 100644 crates/sqllogictest/testdata/slts/demo/verify.slt create mode 100644 crates/sqllogictest/tests/sqllogictests.rs diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index 21388e53de..0b6509a54f 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -22,13 +22,21 @@ bigdecimal = "0.4.1" rust_decimal = { version = "1.27.0" } tempfile = { workspace = true } log = "0.4.22" -tokio = "1.38.0" spark-connect-rs = "0.0.1-beta.5" anyhow = {workspace = true} toml = "0.8.19" url = {workspace = true} +iceberg-datafusion = { path = "../integrations/datafusion" } +iceberg-catalog-rest = { path = "../catalog/rest" } + +[dev-dependencies] +tokio = "1.38.0" +env_logger = { workspace = true } +libtest-mimic = "0.7.3" +iceberg_test_utils = { path = "../test_utils", features = ["tests"] } + [[test]] harness = false name = "sqllogictests" -path = "bin/sqllogictests.rs" \ No newline at end of file +path = "tests/sqllogictests.rs" \ No newline at end of file diff --git a/crates/sqllogictest/bin/sqllogictests.rs b/crates/sqllogictest/bin/sqllogictests.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 7f881c4c62..4ad1c4775b 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -15,18 +15,21 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; -use std::{path::PathBuf, time::Duration}; use arrow_array::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execute_stream; use datafusion::prelude::{SessionConfig, SessionContext}; -use log::info; use sqllogictest::DBOutput; - -use crate::engine::output::{DFColumnType, DFOutput}; +use std::sync::Arc; +use std::time::Duration; +use anyhow::anyhow; +use datafusion::catalog::CatalogProvider; +use toml::Table; +use iceberg_catalog_rest::RestCatalogConfig; +use iceberg_datafusion::IcebergCatalogProvider; use crate::engine::normalize; +use crate::engine::output::{DFColumnType, DFOutput}; pub struct DataFusionEngine { ctx: SessionContext, @@ -85,3 +88,30 @@ async fn run_query(ctx: &SessionContext, sql: impl Into) -> anyhow::Resu Ok(DBOutput::Rows { types, rows }) } } + +impl DataFusionEngine { + pub async fn new(configs: &Table) -> anyhow::Result { + let config = SessionConfig::new() + .with_target_partitions(4); + + let ctx = SessionContext::new_with_config(config); + ctx.register_catalog("demo", Self::create_catalog(configs).await?); + + Ok(Self { + ctx + }) + } + + async fn create_catalog(configs: &Table) -> anyhow::Result> { + let rest_catalog_url = configs.get("url") + .ok_or_else(anyhow!("url not found datafusion engine!"))? + .as_str() + .ok_or_else(anyhow!("url is not str"))?; + + let rest_catalog = RestCatalogConfig::builder() + .uri(rest_catalog_url.to_string()) + .build(); + + Ok(Arc::new(IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?)) + } +} diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 67dee7ce90..05e90db9e3 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -34,18 +34,19 @@ pub use datafusion::*; #[derive(Clone)] pub enum Engine { - DataFusion, + DataFusion(Arc), SparkSQL(Arc
), } impl Engine { pub async fn new(typ: &str, configs: &Table) -> anyhow::Result { + let configs = Arc::new(configs.clone()); match typ { "spark" => { - Ok(Engine::SparkSQL(Arc::new(configs.clone()))) + Ok(Engine::SparkSQL(configs)) } "datafusion" => { - Ok(Engine::DataFusion) + Ok(Engine::DataFusion(configs)) } other => Err(anyhow!("Unknown engine type: {other}")) } @@ -55,12 +56,13 @@ impl Engine { let absolute_file = format!("{}/testdata/slts/{}", env!("CARGO_MANIFEST_DIR"), slt_file); match self { - Engine::DataFusion => { - let runner = Runner::new(|| DataFusionEngine::default()); + Engine::DataFusion(configs) => { + let configs = configs.clone(); + let runner = Runner::new(async || DataFusionEngine::new(&*configs).await); Self::run_with_runner(runner, absolute_file).await } - Engine::SparkSQL(t) => { - let configs = t.clone(); + Engine::SparkSQL(configs) => { + let configs = configs.clone(); let runner = Runner::new(async || { SparkSqlEngine::new(&*configs).await }); diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictest/src/lib.rs index d01802f61d..c4afb16164 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictest/src/lib.rs @@ -18,4 +18,4 @@ // This lib contains codes copied from // [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest) mod engine; -mod schedule; +pub mod schedule; diff --git a/crates/sqllogictest/testdata/docker/docker-compose.yaml b/crates/sqllogictest/testdata/docker/docker-compose.yaml new file mode 100644 index 0000000000..8c3e32f3ff --- /dev/null +++ b/crates/sqllogictest/testdata/docker/docker-compose.yaml @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +services: + rest: + image: tabulario/iceberg-rest:0.10.0 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog + - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory + - CATALOG_WAREHOUSE=s3://icebergdata/demo + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + depends_on: + - minio +# networks: +# host: +# aliases: +# - icebergdata.minio + ports: + - "8181:8181" + + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + hostname: icebergdata.minio + ports: + - "9001:9001" + - "9000:9000" + command: ["server", "/data", "--console-address", ":9001"] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null " + + spark: + depends_on: + - rest + - minio + image: apache/spark:3.5.2-java17 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - SPARK_HOME=/opt/spark + - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + user: root + links: + - minio:icebergdata.minio + ports: + - "15002:15002" + healthcheck: + test: netstat -ltn | grep -c 15002 + interval: 1s + retries: 1200 + volumes: + - ./spark:/spark-script + entrypoint: [ "/spark-script/spark-connect-server.sh" ] + diff --git a/crates/sqllogictest/testdata/docker/spark/spark-connect-server.sh b/crates/sqllogictest/testdata/docker/spark/spark-connect-server.sh new file mode 100755 index 0000000000..31c064c4e5 --- /dev/null +++ b/crates/sqllogictest/testdata/docker/spark/spark-connect-server.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +set -ex + +SPARK_VERSION="3.5.2" +ICEBERG_VERSION="1.6.0" + +PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:$ICEBERG_VERSION" +PACKAGES="$PACKAGES,org.apache.iceberg:iceberg-aws-bundle:$ICEBERG_VERSION" +PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION" + +/opt/spark/sbin/start-connect-server.sh \ + --packages $PACKAGES \ + --master local[3] \ + --conf spark.driver.extraJavaOptions="-Dlog4j.configuration=file:///spark-script/log4j2.properties" \ + --conf spark.driver.bindAddress=0.0.0.0 \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \ + --conf spark.sql.catalog.demo.uri=http://rest:8181 \ + --conf spark.sql.catalog.demo.s3.endpoint=http://minio:9000 \ + --conf spark.sql.catalog.demo.s3.path.style.access=true \ + --conf spark.sql.catalog.demo.s3.access.key=admin \ + --conf spark.sql.catalog.demo.s3.secret.key=password \ + --conf spark.sql.defaultCatalog=demo + +tail -f /opt/spark/logs/spark*.out \ No newline at end of file diff --git a/crates/sqllogictest/testdata/schedules/demo.toml b/crates/sqllogictest/testdata/schedules/demo.toml index 1bae195cb9..a795627420 100644 --- a/crates/sqllogictest/testdata/schedules/demo.toml +++ b/crates/sqllogictest/testdata/schedules/demo.toml @@ -1,10 +1,10 @@ [engines] spark = { type = "spark", url = "sc://localhost:7077" } -df = { type = "datafusion" } +df = { type = "datafusion", url = "http://localhost:8181" } [[steps]] engine = "spark" -filename = "prepare1.sql" +filename = "prepare.slt" [[steps]] engine = "df" diff --git a/crates/sqllogictest/testdata/slts/demo/prepare.slt b/crates/sqllogictest/testdata/slts/demo/prepare.slt new file mode 100644 index 0000000000..4b0fd89860 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/demo/prepare.slt @@ -0,0 +1,11 @@ +statement ok +CREATE SCHEMA s1 IF NOT EXISTS; + +statement ok +USE SCHEMA s1; + +statement ok +CREATE TABLE t1 (id INTEGER); + +statement ok +INSERT INTO t1 VALUES (1), (2), (3); diff --git a/crates/sqllogictest/testdata/slts/demo/verify.slt b/crates/sqllogictest/testdata/slts/demo/verify.slt new file mode 100644 index 0000000000..7c932222b8 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/demo/verify.slt @@ -0,0 +1,6 @@ +query II rowsort +SELECT * FROM s1.t1; +---- +1 +2 +3 \ No newline at end of file diff --git a/crates/sqllogictest/tests/sqllogictests.rs b/crates/sqllogictest/tests/sqllogictests.rs new file mode 100644 index 0000000000..6a6939377b --- /dev/null +++ b/crates/sqllogictest/tests/sqllogictests.rs @@ -0,0 +1,78 @@ +use std::fs; +use std::path::PathBuf; +use libtest_mimic::{Arguments, Trial}; +use tokio::runtime::Handle; +use iceberg_test_utils::docker::DockerCompose; +use sqllogictest::schedule::Schedule; + +fn main() { + env_logger::init(); + + log::info!("Starting docker compose..."); + let docker = start_docker().unwrap(); + + log::info!("Starting tokio runtime..."); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + // Parse command line arguments + let args = Arguments::from_args(); + + log::info!("Creating tests..."); + let tests = collect_trials(rt.handle().clone()).unwrap(); + + log::info!("Starting tests..."); + // Run all tests and exit the application appropriatly. + let result = libtest_mimic::run(&args, tests); + + log::info!("Shutting down tokio runtime..."); + drop(rt); + log::info!("Shutting down docker..."); + drop(docker); + + result.exit(); +} + +fn start_docker() -> anyhow::Result { + let docker = DockerCompose::new("sqllogictests", + format!("{}/testdata/docker", env!("CARGO_MANIFEST_DIR"))); + docker.run(); + Ok(docker) +} + +fn collect_trials(handle: Handle) -> anyhow::Result> { + let schedule_files = collect_schedule_files()?; + log::debug!("Found {} schedule files: {}", schedule_files.len(), &schedule_files); + let mut trials = Vec::with_capacity(schedule_files.len()); + for schedule_file in schedule_files { + let h = handle.clone(); + let trial_name = format!("Test schedule {}", + schedule_file.file_name() + .expect("Schedule file should have a name") + .to_string_lossy()); + let trial = Trial::new(trial_name, move || h.block_on(run_schedule(schedule_file.clone()))); + trials.push(trial); + } + Ok(trials) +} + +fn collect_schedule_files() -> anyhow::Result> { + let dir = PathBuf::from(format!("{}/testdata/schedules", env!("CARGO_MANIFEST_DIR"))); + let mut schedule_files = Vec::with_capacity(32); + for entry in fs::read_dir(&dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + schedule_files.push(fs::canonicalize(dir.join(path))?); + } + } + Ok(schedule_files) +} + +async fn run_schedule(schedule_file: PathBuf) -> anyhow::Result<()> { + let schedule = Schedule::parse(schedule_file).await?; + schedule.run().await?; + Ok(()) +} \ No newline at end of file From e99af6eefc2422639c0d099ec1ccc005a74ec315 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 26 Aug 2024 21:41:55 +0800 Subject: [PATCH 04/10] Fix build break --- crates/sqllogictest/Cargo.toml | 4 +- crates/sqllogictest/src/engine/datafusion.rs | 27 ++--- crates/sqllogictest/src/engine/mod.rs | 17 ++-- crates/sqllogictest/src/engine/normalize.rs | 99 +++++-------------- crates/sqllogictest/src/engine/spark.rs | 19 +++- crates/sqllogictest/src/error.rs | 4 + crates/sqllogictest/src/lib.rs | 2 + crates/sqllogictest/src/schedule.rs | 7 +- .../testdata/docker/docker-compose.yaml | 2 + 9 files changed, 74 insertions(+), 107 deletions(-) create mode 100644 crates/sqllogictest/src/error.rs diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index 0b6509a54f..5985cb9213 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -28,10 +28,10 @@ toml = "0.8.19" url = {workspace = true} iceberg-datafusion = { path = "../integrations/datafusion" } iceberg-catalog-rest = { path = "../catalog/rest" } - -[dev-dependencies] tokio = "1.38.0" env_logger = { workspace = true } + +[dev-dependencies] libtest-mimic = "0.7.3" iceberg_test_utils = { path = "../test_utils", features = ["tests"] } diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 4ad1c4775b..3a78bcf00f 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -20,16 +20,17 @@ use async_trait::async_trait; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execute_stream; use datafusion::prelude::{SessionConfig, SessionContext}; -use sqllogictest::DBOutput; +use sqllogictest::{AsyncDB, DBOutput}; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use datafusion::catalog::CatalogProvider; use toml::Table; -use iceberg_catalog_rest::RestCatalogConfig; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_datafusion::IcebergCatalogProvider; use crate::engine::normalize; use crate::engine::output::{DFColumnType, DFOutput}; +use crate::error::{Result, Error}; pub struct DataFusionEngine { ctx: SessionContext, @@ -49,12 +50,12 @@ impl Default for DataFusionEngine { } #[async_trait] -impl sqllogictest::AsyncDB for DataFusionEngine { - type Error = anyhow::Error; +impl AsyncDB for DataFusionEngine { + type Error = Error; type ColumnType = DFColumnType; - async fn run(&mut self, sql: &str) -> anyhow::Result { - run_query(&self.ctx, sql).await + async fn run(&mut self, sql: &str) -> Result { + run_query(&self.ctx, sql).await.map_err(Box::new) } /// Engine name of current database. @@ -72,7 +73,7 @@ impl sqllogictest::AsyncDB for DataFusionEngine { } } -async fn run_query(ctx: &SessionContext, sql: impl Into) -> anyhow::Result { +async fn run_query(ctx: &SessionContext, sql: impl Into) -> Result { let df = ctx.sql(sql.into().as_str()).await?; let task_ctx = Arc::new(df.task_ctx()); let plan = df.create_physical_plan().await?; @@ -90,7 +91,7 @@ async fn run_query(ctx: &SessionContext, sql: impl Into) -> anyhow::Resu } impl DataFusionEngine { - pub async fn new(configs: &Table) -> anyhow::Result { + pub async fn new(configs: &Table) -> Result { let config = SessionConfig::new() .with_target_partitions(4); @@ -104,14 +105,16 @@ impl DataFusionEngine { async fn create_catalog(configs: &Table) -> anyhow::Result> { let rest_catalog_url = configs.get("url") - .ok_or_else(anyhow!("url not found datafusion engine!"))? + .ok_or_else(|| anyhow!("url not found datafusion engine!"))? .as_str() - .ok_or_else(anyhow!("url is not str"))?; + .ok_or_else(|| anyhow!("url is not str"))?; - let rest_catalog = RestCatalogConfig::builder() + let rest_catalog_config = RestCatalogConfig::builder() .uri(rest_catalog_url.to_string()) .build(); - Ok(Arc::new(IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?)) + let rest_catalog = RestCatalog::new(rest_catalog_config); + + Ok(Arc::new(IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?)) } } diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 05e90db9e3..3ae7b6de24 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use anyhow::anyhow; +use anyhow::{anyhow, bail}; pub use datafusion::*; use sqllogictest::{strict_column_validator, AsyncDB, MakeConnection, Runner}; use std::sync::Arc; @@ -30,6 +30,7 @@ pub use spark::*; mod datafusion; pub use datafusion::*; +use crate::error::Result; #[derive(Clone)] @@ -39,7 +40,7 @@ pub enum Engine { } impl Engine { - pub async fn new(typ: &str, configs: &Table) -> anyhow::Result { + pub async fn new(typ: &str, configs: &Table) -> Result { let configs = Arc::new(configs.clone()); match typ { "spark" => { @@ -48,22 +49,24 @@ impl Engine { "datafusion" => { Ok(Engine::DataFusion(configs)) } - other => Err(anyhow!("Unknown engine type: {other}")) + other => Err(anyhow!("Unknown engine type: {other}").into()) } } pub async fn run_slt_file(self, slt_file: impl Into) -> anyhow::Result<()> { - let absolute_file = format!("{}/testdata/slts/{}", env!("CARGO_MANIFEST_DIR"), slt_file); + let absolute_file = format!("{}/testdata/slts/{}", env!("CARGO_MANIFEST_DIR"), slt_file.into()); match self { Engine::DataFusion(configs) => { let configs = configs.clone(); - let runner = Runner::new(async || DataFusionEngine::new(&*configs).await); + let runner = Runner::new(|| async { + DataFusionEngine::new(&*configs).await + }); Self::run_with_runner(runner, absolute_file).await } Engine::SparkSQL(configs) => { let configs = configs.clone(); - let runner = Runner::new(async || { + let runner = Runner::new(|| async { SparkSqlEngine::new(&*configs).await }); Self::run_with_runner(runner, absolute_file).await @@ -71,7 +74,7 @@ impl Engine { } } - async fn run_with_runner(mut runner: Runner, + async fn run_with_runner>(mut runner: Runner, slt_file: String) -> anyhow::Result<()> { runner.with_column_validator(strict_column_validator); Ok(runner diff --git a/crates/sqllogictest/src/engine/normalize.rs b/crates/sqllogictest/src/engine/normalize.rs index 7ca16fa9ab..3100e9e924 100644 --- a/crates/sqllogictest/src/engine/normalize.rs +++ b/crates/sqllogictest/src/engine/normalize.rs @@ -15,19 +15,17 @@ // specific language governing permissions and limitations // under the License. -use arrow_schema::Fields; -use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; -use datafusion_common::DataFusionError; -use std::path::PathBuf; -use std::sync::OnceLock; -use arrow_array::{ArrayRef, RecordBatch}; use crate::engine::output::DFColumnType; +use anyhow::anyhow; +use arrow_array::{ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray}; +use arrow_schema::{DataType, Fields}; +use datafusion::arrow::util::display::ArrayFormatter; +use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; use crate::engine::conversion::*; -use crate::engine::datafusion::error::{DFSqlLogicTestError, Result}; /// Converts `batches` to a result as expected by sqllogicteset. -pub(crate) fn convert_batches(batches: Vec) -> Result>> { +pub(crate) fn convert_batches(batches: Vec) -> anyhow::Result>> { if batches.is_empty() { Ok(vec![]) } else { @@ -36,19 +34,17 @@ pub(crate) fn convert_batches(batches: Vec) -> Result) -> Result) -> impl Iterator> { +fn expand_row(mut row: Vec) -> impl Iterator> { use itertools::Either; use std::iter::once; @@ -115,65 +111,15 @@ fn expand_row(mut row: Vec) -> impl Iterator> { } } -/// normalize path references -/// -/// ```text -/// CsvExec: files={1 group: [[path/to/datafusion/testing/data/csv/aggregate_test_100.csv]]}, ... -/// ``` -/// -/// into: -/// -/// ```text -/// CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, ... -/// ``` -fn normalize_paths(mut row: Vec) -> Vec { - row.iter_mut().for_each(|s| { - let workspace_root: &str = workspace_root().as_ref(); - if s.contains(workspace_root) { - *s = s.replace(workspace_root, "WORKSPACE_ROOT"); - } - }); - row -} - -/// return the location of the datafusion checkout -fn workspace_root() -> &'static object_store::path::Path { - static WORKSPACE_ROOT_LOCK: OnceLock = OnceLock::new(); - WORKSPACE_ROOT_LOCK.get_or_init(|| { - // e.g. /Software/datafusion/datafusion/core - let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - - // e.g. /Software/datafusion/datafusion - let workspace_root = dir - .parent() - .expect("Can not find parent of datafusion/core") - // e.g. /Software/datafusion - .parent() - .expect("parent of datafusion") - .to_string_lossy(); - - let sanitized_workplace_root = if cfg!(windows) { - // Object store paths are delimited with `/`, e.g. `/datafusion/datafusion/testing/data/csv/aggregate_test_100.csv`. - // The default windows delimiter is `\`, so the workplace path is `datafusion\datafusion`. - workspace_root - .replace(std::path::MAIN_SEPARATOR, object_store::path::DELIMITER) - } else { - workspace_root.to_string() - }; - - object_store::path::Path::parse(sanitized_workplace_root).unwrap() - }) -} - /// Convert a single batch to a `Vec>` for comparison -fn convert_batch(batch: RecordBatch) -> Result>> { +fn convert_batch(batch: RecordBatch) -> anyhow::Result>> { (0..batch.num_rows()) .map(|row| { batch .columns() .iter() .map(|col| cell_to_string(col, row)) - .collect::>>() + .collect::>>() }) .collect() } @@ -196,7 +142,7 @@ macro_rules! get_row_value { /// /// Floating numbers are rounded to have a consistent representation with the Postgres runner. /// -pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { +pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result { if !col.is_valid(row) { // represent any null value with the string "NULL" Ok(NULL_STR.to_string()) @@ -204,35 +150,35 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { match col.data_type() { DataType::Null => Ok(NULL_STR.to_string()), DataType::Boolean => { - Ok(bool_to_str(get_row_value!(array::BooleanArray, col, row))) + Ok(bool_to_str(get_row_value!(BooleanArray, col, row))) } DataType::Float16 => { - Ok(f16_to_str(get_row_value!(array::Float16Array, col, row))) + Ok(f16_to_str(get_row_value!(Float16Array, col, row))) } DataType::Float32 => { - Ok(f32_to_str(get_row_value!(array::Float32Array, col, row))) + Ok(f32_to_str(get_row_value!(Float32Array, col, row))) } DataType::Float64 => { - Ok(f64_to_str(get_row_value!(array::Float64Array, col, row))) + Ok(f64_to_str(get_row_value!(Float64Array, col, row))) } DataType::Decimal128(precision, scale) => { - let value = get_row_value!(array::Decimal128Array, col, row); + let value = get_row_value!(Decimal128Array, col, row); Ok(i128_to_str(value, precision, scale)) } DataType::Decimal256(precision, scale) => { - let value = get_row_value!(array::Decimal256Array, col, row); + let value = get_row_value!(Decimal256Array, col, row); Ok(i256_to_str(value, precision, scale)) } DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!( - array::LargeStringArray, + LargeStringArray, col, row ))), DataType::Utf8 => { - Ok(varchar_to_str(get_row_value!(array::StringArray, col, row))) + Ok(varchar_to_str(get_row_value!(StringArray, col, row))) } DataType::Utf8View => Ok(varchar_to_str(get_row_value!( - array::StringViewArray, + StringViewArray, col, row ))), @@ -241,7 +187,6 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { Ok(f.unwrap().value(row).to_string()) } } - .map_err(DFSqlLogicTestError::Arrow) } } diff --git a/crates/sqllogictest/src/engine/spark.rs b/crates/sqllogictest/src/engine/spark.rs index d7772fdbff..bf94e5db10 100644 --- a/crates/sqllogictest/src/engine/spark.rs +++ b/crates/sqllogictest/src/engine/spark.rs @@ -22,21 +22,30 @@ use itertools::Itertools; use spark_connect_rs::{SparkSession, SparkSessionBuilder}; use sqllogictest::{AsyncDB, DBOutput}; use std::time::Duration; +use async_trait::async_trait; use toml::Table; +use crate::error::*; /// SparkSql engine implementation for sqllogictest. pub struct SparkSqlEngine { session: SparkSession, } +#[async_trait] impl AsyncDB for SparkSqlEngine { - type Error = anyhow::Error; + type Error = Error; type ColumnType = DFColumnType; - async fn run(&mut self, sql: &str) -> anyhow::Result> { - let results = self.session.sql(sql).await?.collect()?; + async fn run(&mut self, sql: &str) -> Result> { + let results = self.session + .sql(sql) + .await + .map_err(Box::new)? + .collect() + .await + .map_err(Box::new)?; let types = normalize::convert_schema_to_types(results.schema().fields()); - let rows = crate::engine::normalize::convert_batches(results)?; + let rows = normalize::convert_batches(results)?; if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) @@ -61,7 +70,7 @@ impl AsyncDB for SparkSqlEngine { } impl SparkSqlEngine { - pub async fn new(configs: &Table) -> anyhow::Result { + pub async fn new(configs: &Table) -> Result { let url = configs.get("url") .ok_or_else(|| anyhow!("url property doesn't exist for spark engine"))? .as_str() diff --git a/crates/sqllogictest/src/error.rs b/crates/sqllogictest/src/error.rs new file mode 100644 index 0000000000..a6abf74fdb --- /dev/null +++ b/crates/sqllogictest/src/error.rs @@ -0,0 +1,4 @@ + + +pub type Error = Box; +pub type Result = std::result::Result; diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictest/src/lib.rs index c4afb16164..40fdc93b9e 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictest/src/lib.rs @@ -19,3 +19,5 @@ // [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest) mod engine; pub mod schedule; +mod error; +pub use error::*; diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index 0a148bf222..1fd627d252 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. +use crate::engine::Engine; +use anyhow::anyhow; +use itertools::Itertools; use std::collections::HashMap; use std::fs::read_to_string; use std::path::Path; -use anyhow::anyhow; -use itertools::Itertools; use toml::{Table, Value}; -use toml::value::Array; -use crate::engine::Engine; /// Schedule of engines to run tests. pub struct Schedule { diff --git a/crates/sqllogictest/testdata/docker/docker-compose.yaml b/crates/sqllogictest/testdata/docker/docker-compose.yaml index 8c3e32f3ff..894b37ac7a 100644 --- a/crates/sqllogictest/testdata/docker/docker-compose.yaml +++ b/crates/sqllogictest/testdata/docker/docker-compose.yaml @@ -71,6 +71,8 @@ services: - AWS_REGION=us-east-1 - SPARK_HOME=/opt/spark - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + - HTTP_PROXY=http://host.docker.internal:7890 + - HTTPS_PROXY=http://host.docker.internal:7890 user: root links: - minio:icebergdata.minio From 0c92b0c66b7d7eaaf585ba1e80157b22e4c635e2 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 13 Sep 2024 16:26:26 +0800 Subject: [PATCH 05/10] Make clippy happy --- Cargo.toml | 3 +- .../src/writer/file_writer/track_writer.rs | 3 +- crates/sqllogictest/src/error.rs | 4 - .../Cargo.toml | 9 +- .../sqllogictests/src/display/conversion.rs | 82 +++++++ .../src/display/conversion_51.rs} | 30 +-- crates/sqllogictests/src/display/mod.rs | 4 + .../src/display}/normalize.rs | 78 +++---- .../sqllogictests/src/display/normalize_51.rs | 205 ++++++++++++++++++ .../src/engine/datafusion.rs | 45 ++-- .../src/engine/mod.rs | 48 ++-- .../src/engine/output.rs | 2 +- .../src/engine/spark.rs | 32 +-- crates/sqllogictests/src/error.rs | 28 +++ .../src/lib.rs | 4 +- .../src/schedule.rs | 55 ++--- .../testdata/docker/docker-compose.yaml | 0 .../docker/spark/spark-connect-server.sh | 0 .../testdata/schedules/demo.toml | 0 .../testdata/slts/demo/prepare.slt | 0 .../testdata/slts/demo/verify.slt | 0 .../tests/sqllogictests.rs | 34 ++- 22 files changed, 479 insertions(+), 187 deletions(-) delete mode 100644 crates/sqllogictest/src/error.rs rename crates/{sqllogictest => sqllogictests}/Cargo.toml (87%) create mode 100644 crates/sqllogictests/src/display/conversion.rs rename crates/{sqllogictest/src/engine/conversion.rs => sqllogictests/src/display/conversion_51.rs} (70%) create mode 100644 crates/sqllogictests/src/display/mod.rs rename crates/{sqllogictest/src/engine => sqllogictests/src/display}/normalize.rs (77%) create mode 100644 crates/sqllogictests/src/display/normalize_51.rs rename crates/{sqllogictest => sqllogictests}/src/engine/datafusion.rs (86%) rename crates/{sqllogictest => sqllogictests}/src/engine/mod.rs (69%) rename crates/{sqllogictest => sqllogictests}/src/engine/output.rs (97%) rename crates/{sqllogictest => sqllogictests}/src/engine/spark.rs (82%) create mode 100644 crates/sqllogictests/src/error.rs rename crates/{sqllogictest => sqllogictests}/src/lib.rs (98%) rename crates/{sqllogictest => sqllogictests}/src/schedule.rs (79%) rename crates/{sqllogictest => sqllogictests}/testdata/docker/docker-compose.yaml (100%) rename crates/{sqllogictest => sqllogictests}/testdata/docker/spark/spark-connect-server.sh (100%) rename crates/{sqllogictest => sqllogictests}/testdata/schedules/demo.toml (100%) rename crates/{sqllogictest => sqllogictests}/testdata/slts/demo/prepare.slt (100%) rename crates/{sqllogictest => sqllogictests}/testdata/slts/demo/verify.slt (100%) rename crates/{sqllogictest => sqllogictests}/tests/sqllogictests.rs (74%) diff --git a/Cargo.toml b/Cargo.toml index e3f431e588..7ef5e101b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ members = [ "crates/iceberg", "crates/integrations/*", "crates/test_utils", - "crates/sqllogictest", + "crates/sqllogictests", ] exclude = ["bindings/python"] @@ -40,6 +40,7 @@ rust-version = "1.77.1" anyhow = "1.0.72" apache-avro = "0.17" array-init = "2" +arrow = { version = "52" } arrow-arith = { version = "52" } arrow-array = { version = "52" } arrow-ord = { version = "52" } diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs b/crates/iceberg/src/writer/file_writer/track_writer.rs index 6c60a1aa70..7b916aeb58 100644 --- a/crates/iceberg/src/writer/file_writer/track_writer.rs +++ b/crates/iceberg/src/writer/file_writer/track_writer.rs @@ -42,10 +42,9 @@ impl TrackWriter { impl FileWrite for TrackWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let size = bs.len(); - self.inner.write(bs).await.map(|v| { + self.inner.write(bs).await.inspect(|_| { self.written_size .fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed); - v }) } diff --git a/crates/sqllogictest/src/error.rs b/crates/sqllogictest/src/error.rs deleted file mode 100644 index a6abf74fdb..0000000000 --- a/crates/sqllogictest/src/error.rs +++ /dev/null @@ -1,4 +0,0 @@ - - -pub type Error = Box; -pub type Result = std::result::Result; diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictests/Cargo.toml similarity index 87% rename from crates/sqllogictest/Cargo.toml rename to crates/sqllogictests/Cargo.toml index 5985cb9213..b481125e53 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictests/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sqllogictest" +name = "sqllogictests" version.workspace = true edition.workspace = true homepage.workspace = true @@ -8,10 +8,11 @@ license.workspace = true rust-version.workspace = true [dependencies] -arrow-schema = { workspace = true } -arrow-array= { workspace = true } +arrow = { workspace = true } +# For spark-connect-rs +arrow_51 = { version = "51", package = "arrow"} async-trait = { workspace = true } -sqllogictest = "0.21.0" +sqllogictest = "0.22" datafusion = { workspace = true, default-features = true} datafusion-common = { workspace = true, default-features = true} thiserror = "1.0.63" diff --git a/crates/sqllogictests/src/display/conversion.rs b/crates/sqllogictests/src/display/conversion.rs new file mode 100644 index 0000000000..d68afaaf00 --- /dev/null +++ b/crates/sqllogictests/src/display/conversion.rs @@ -0,0 +1,82 @@ +use arrow::array::types::{Decimal128Type, Decimal256Type, DecimalType}; +use arrow::datatypes::i256; +use bigdecimal::BigDecimal; +use half::f16; +use rust_decimal::prelude::*; + +/// Represents a constant for NULL string in your database. +pub const NULL_STR: &str = "NULL"; + +pub(crate) fn bool_to_str(value: bool) -> String { + if value { + "true".to_string() + } else { + "false".to_string() + } +} + +pub(crate) fn varchar_to_str(value: &str) -> String { + if value.is_empty() { + "(empty)".to_string() + } else { + value.trim_end_matches('\n').to_string() + } +} + +pub(crate) fn f16_to_str(value: f16) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f16::INFINITY { + "Infinity".to_string() + } else if value == f16::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn f32_to_str(value: f32) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f32::INFINITY { + "Infinity".to_string() + } else if value == f32::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn f64_to_str(value: f64) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f64::INFINITY { + "Infinity".to_string() + } else if value == f64::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String { + big_decimal_to_str( + BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)).unwrap(), + ) +} + +pub(crate) fn i256_to_str(value: i256, precision: &u8, scale: &i8) -> String { + big_decimal_to_str( + BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)).unwrap(), + ) +} + +pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String { + value.round(12).normalized().to_string() +} diff --git a/crates/sqllogictest/src/engine/conversion.rs b/crates/sqllogictests/src/display/conversion_51.rs similarity index 70% rename from crates/sqllogictest/src/engine/conversion.rs rename to crates/sqllogictests/src/display/conversion_51.rs index 937f43520b..c5ef69c884 100644 --- a/crates/sqllogictest/src/engine/conversion.rs +++ b/crates/sqllogictests/src/display/conversion_51.rs @@ -1,26 +1,10 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType}; +use arrow_51::array::types::{Decimal128Type, Decimal256Type, DecimalType}; +use arrow_51::datatypes::i256; use bigdecimal::BigDecimal; -use datafusion_common::arrow::datatypes::i256; use half::f16; use rust_decimal::prelude::*; + /// Represents a constant for NULL string in your database. pub const NULL_STR: &str = "NULL"; @@ -84,18 +68,16 @@ pub(crate) fn f64_to_str(value: f64) -> String { pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String { big_decimal_to_str( - BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)) - .unwrap(), + BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)).unwrap(), ) } pub(crate) fn i256_to_str(value: i256, precision: &u8, scale: &i8) -> String { big_decimal_to_str( - BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)) - .unwrap(), + BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)).unwrap(), ) } pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String { value.round(12).normalized().to_string() -} \ No newline at end of file +} diff --git a/crates/sqllogictests/src/display/mod.rs b/crates/sqllogictests/src/display/mod.rs new file mode 100644 index 0000000000..55fe9d2ad9 --- /dev/null +++ b/crates/sqllogictests/src/display/mod.rs @@ -0,0 +1,4 @@ +pub mod conversion; +pub mod conversion_51; +pub mod normalize; +pub mod normalize_51; diff --git a/crates/sqllogictest/src/engine/normalize.rs b/crates/sqllogictests/src/display/normalize.rs similarity index 77% rename from crates/sqllogictest/src/engine/normalize.rs rename to crates/sqllogictests/src/display/normalize.rs index 3100e9e924..2a1ffd9384 100644 --- a/crates/sqllogictest/src/engine/normalize.rs +++ b/crates/sqllogictests/src/display/normalize.rs @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::engine::output::DFColumnType; use anyhow::anyhow; -use arrow_array::{ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray}; -use arrow_schema::{DataType, Fields}; -use datafusion::arrow::util::display::ArrayFormatter; +use arrow::array::{ + ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, + Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray, +}; +use arrow::datatypes::{DataType, Fields}; +use arrow::util::display::ArrayFormatter; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; - -use crate::engine::conversion::*; +use crate::display::conversion::*; +use crate::engine::output::DFColumnType; /// Converts `batches` to a result as expected by sqllogicteset. pub(crate) fn convert_batches(batches: Vec) -> anyhow::Result>> { @@ -35,16 +37,13 @@ pub(crate) fn convert_batches(batches: Vec) -> anyhow::Result) -> anyhow::Result) -> impl Iterator> { - use itertools::Either; +fn expand_row(mut row: Vec) -> impl Iterator> { use std::iter::once; + use itertools::Either; + // check last cell if let Some(cell) = row.pop() { let lines: Vec<_> = cell.split('\n').collect(); @@ -93,7 +93,7 @@ fn expand_row(mut row: Vec) -> impl Iterator> { .enumerate() .map(|(idx, l)| { // replace any leading spaces with '-' as - // `sqllogictest` ignores whitespace differences + // `sqllogictests` ignores whitespace differences // // See https://github.com/apache/datafusion/issues/6328 let content = l.trim_start(); @@ -141,7 +141,6 @@ macro_rules! get_row_value { /// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings /// /// Floating numbers are rounded to have a consistent representation with the Postgres runner. -/// pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result { if !col.is_valid(row) { // represent any null value with the string "NULL" @@ -149,18 +148,10 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result { } else { match col.data_type() { DataType::Null => Ok(NULL_STR.to_string()), - DataType::Boolean => { - Ok(bool_to_str(get_row_value!(BooleanArray, col, row))) - } - DataType::Float16 => { - Ok(f16_to_str(get_row_value!(Float16Array, col, row))) - } - DataType::Float32 => { - Ok(f32_to_str(get_row_value!(Float32Array, col, row))) - } - DataType::Float64 => { - Ok(f64_to_str(get_row_value!(Float64Array, col, row))) - } + DataType::Boolean => Ok(bool_to_str(get_row_value!(BooleanArray, col, row))), + DataType::Float16 => Ok(f16_to_str(get_row_value!(Float16Array, col, row))), + DataType::Float32 => Ok(f32_to_str(get_row_value!(Float32Array, col, row))), + DataType::Float64 => Ok(f64_to_str(get_row_value!(Float64Array, col, row))), DataType::Decimal128(precision, scale) => { let value = get_row_value!(Decimal128Array, col, row); Ok(i128_to_str(value, precision, scale)) @@ -169,19 +160,9 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result { let value = get_row_value!(Decimal256Array, col, row); Ok(i256_to_str(value, precision, scale)) } - DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!( - LargeStringArray, - col, - row - ))), - DataType::Utf8 => { - Ok(varchar_to_str(get_row_value!(StringArray, col, row))) - } - DataType::Utf8View => Ok(varchar_to_str(get_row_value!( - StringViewArray, - col, - row - ))), + DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(LargeStringArray, col, row))), + DataType::Utf8 => Ok(varchar_to_str(get_row_value!(StringArray, col, row))), + DataType::Utf8View => Ok(varchar_to_str(get_row_value!(StringViewArray, col, row))), _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) @@ -210,15 +191,12 @@ pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { | DataType::Float64 | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => DFColumnType::Float, - DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { - DFColumnType::Text + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => DFColumnType::Text, + DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) => { + DFColumnType::DateTime } - DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) => DFColumnType::DateTime, DataType::Timestamp(_, _) => DFColumnType::Timestamp, _ => DFColumnType::Another, }) .collect() -} \ No newline at end of file +} diff --git a/crates/sqllogictests/src/display/normalize_51.rs b/crates/sqllogictests/src/display/normalize_51.rs new file mode 100644 index 0000000000..dd620c23c8 --- /dev/null +++ b/crates/sqllogictests/src/display/normalize_51.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::anyhow; +use arrow_51::util::display::{DurationFormat, FormatOptions}; +use arrow_51::array::{ + ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, + Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray, +}; +use arrow_51::datatypes::{DataType, Fields}; +use arrow_51::util::display::ArrayFormatter; +use crate::display::conversion_51::*; +use crate::engine::output::DFColumnType; + +const DEFAULT_FORMAT_OPTIONS: FormatOptions<'static> = + FormatOptions::new().with_duration_format(DurationFormat::Pretty); + +/// Converts `batches` to a result as expected by sqllogicteset. +pub(crate) fn convert_batches(batches: Vec) -> anyhow::Result>> { + if batches.is_empty() { + Ok(vec![]) + } else { + let schema = batches[0].schema(); + let mut rows = vec![]; + for batch in batches { + // Verify schema + if !schema.contains(&batch.schema()) { + return Err(anyhow!( + "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", + &schema, + batch.schema() + )); + } + + let new_rows = convert_batch(batch)?.into_iter().flat_map(expand_row); + rows.extend(new_rows); + } + Ok(rows) + } +} + +/// special case rows that have newlines in them (like explain plans) +// +/// Transform inputs like: +/// ```text +/// [ +/// "logical_plan", +/// "Sort: d.b ASC NULLS LAST\n Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +/// +/// Into one cell per line, adding lines if necessary +/// ```text +/// [ +/// "logical_plan", +/// ] +/// [ +/// "Sort: d.b ASC NULLS LAST", +/// ] +/// [ <--- newly added row +/// "|-- Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +fn expand_row(mut row: Vec) -> impl Iterator> { + use std::iter::once; + + use itertools::Either; + + // check last cell + if let Some(cell) = row.pop() { + let lines: Vec<_> = cell.split('\n').collect(); + + // no newlines in last cell + if lines.len() < 2 { + row.push(cell); + return Either::Left(once(row)); + } + + // form new rows with each additional line + let new_lines: Vec<_> = lines + .into_iter() + .enumerate() + .map(|(idx, l)| { + // replace any leading spaces with '-' as + // `sqllogictests` ignores whitespace differences + // + // See https://github.com/apache/datafusion/issues/6328 + let content = l.trim_start(); + let new_prefix = "-".repeat(l.len() - content.len()); + // maintain for each line a number, so + // reviewing explain result changes is easier + let line_num = idx + 1; + vec![format!("{line_num:02}){new_prefix}{content}")] + }) + .collect(); + + Either::Right(once(row).chain(new_lines)) + } else { + Either::Left(once(row)) + } +} + +/// Convert a single batch to a `Vec>` for comparison +fn convert_batch(batch: RecordBatch) -> anyhow::Result>> { + (0..batch.num_rows()) + .map(|row| { + batch + .columns() + .iter() + .map(|col| cell_to_string(col, row)) + .collect::>>() + }) + .collect() +} + +macro_rules! get_row_value { + ($array_type:ty, $column: ident, $row: ident) => {{ + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + + array.value($row) + }}; +} + +/// Normalizes the content of a single cell in RecordBatch prior to printing. +/// +/// This is to make the output comparable to the semi-standard .slt format +/// +/// Normalizations applied to [NULL Values and empty strings] +/// +/// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings +/// +/// Floating numbers are rounded to have a consistent representation with the Postgres runner. +pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result { + if !col.is_valid(row) { + // represent any null value with the string "NULL" + Ok(NULL_STR.to_string()) + } else { + match col.data_type() { + DataType::Null => Ok(NULL_STR.to_string()), + DataType::Boolean => Ok(bool_to_str(get_row_value!(BooleanArray, col, row))), + DataType::Float16 => Ok(f16_to_str(get_row_value!(Float16Array, col, row))), + DataType::Float32 => Ok(f32_to_str(get_row_value!(Float32Array, col, row))), + DataType::Float64 => Ok(f64_to_str(get_row_value!(Float64Array, col, row))), + DataType::Decimal128(precision, scale) => { + let value = get_row_value!(Decimal128Array, col, row); + Ok(i128_to_str(value, precision, scale)) + } + DataType::Decimal256(precision, scale) => { + let value = get_row_value!(Decimal256Array, col, row); + Ok(i256_to_str(value, precision, scale)) + } + DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(LargeStringArray, col, row))), + DataType::Utf8 => Ok(varchar_to_str(get_row_value!(StringArray, col, row))), + DataType::Utf8View => Ok(varchar_to_str(get_row_value!(StringViewArray, col, row))), + _ => { + let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); + Ok(f.unwrap().value(row).to_string()) + } + } + } +} + +/// Converts columns to a result as expected by sqllogicteset. +pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { + columns + .iter() + .map(|f| f.data_type()) + .map(|data_type| match data_type { + DataType::Boolean => DFColumnType::Boolean, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => DFColumnType::Integer, + DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => DFColumnType::Float, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => DFColumnType::Text, + DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) => { + DFColumnType::DateTime + } + DataType::Timestamp(_, _) => DFColumnType::Timestamp, + _ => DFColumnType::Another, + }) + .collect() +} diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictests/src/engine/datafusion.rs similarity index 86% rename from crates/sqllogictest/src/engine/datafusion.rs rename to crates/sqllogictests/src/engine/datafusion.rs index 3a78bcf00f..46cec9166b 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictests/src/engine/datafusion.rs @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::RecordBatch; -use async_trait::async_trait; -use datafusion::physical_plan::common::collect; -use datafusion::physical_plan::execute_stream; -use datafusion::prelude::{SessionConfig, SessionContext}; -use sqllogictest::{AsyncDB, DBOutput}; use std::sync::Arc; use std::time::Duration; + use anyhow::anyhow; +use arrow::array::RecordBatch; +use async_trait::async_trait; use datafusion::catalog::CatalogProvider; -use toml::Table; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::execute_stream; +use datafusion::prelude::{SessionConfig, SessionContext}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_datafusion::IcebergCatalogProvider; -use crate::engine::normalize; +use sqllogictest::{AsyncDB, DBOutput}; +use toml::Table; + +use crate::display::normalize; use crate::engine::output::{DFColumnType, DFOutput}; -use crate::error::{Result, Error}; +use crate::error::{Error, Result}; pub struct DataFusionEngine { ctx: SessionContext, @@ -38,14 +40,11 @@ pub struct DataFusionEngine { impl Default for DataFusionEngine { fn default() -> Self { - let config = SessionConfig::new() - .with_target_partitions(4); + let config = SessionConfig::new().with_target_partitions(4); let ctx = SessionContext::new_with_config(config); - Self { - ctx - } + Self { ctx } } } @@ -55,7 +54,7 @@ impl AsyncDB for DataFusionEngine { type ColumnType = DFColumnType; async fn run(&mut self, sql: &str) -> Result { - run_query(&self.ctx, sql).await.map_err(Box::new) + Ok(run_query(&self.ctx, sql).await?) } /// Engine name of current database. @@ -73,7 +72,7 @@ impl AsyncDB for DataFusionEngine { } } -async fn run_query(ctx: &SessionContext, sql: impl Into) -> Result { +async fn run_query(ctx: &SessionContext, sql: impl Into) -> anyhow::Result { let df = ctx.sql(sql.into().as_str()).await?; let task_ctx = Arc::new(df.task_ctx()); let plan = df.create_physical_plan().await?; @@ -92,19 +91,17 @@ async fn run_query(ctx: &SessionContext, sql: impl Into) -> Result Result { - let config = SessionConfig::new() - .with_target_partitions(4); + let config = SessionConfig::new().with_target_partitions(4); let ctx = SessionContext::new_with_config(config); ctx.register_catalog("demo", Self::create_catalog(configs).await?); - Ok(Self { - ctx - }) + Ok(Self { ctx }) } async fn create_catalog(configs: &Table) -> anyhow::Result> { - let rest_catalog_url = configs.get("url") + let rest_catalog_url = configs + .get("url") .ok_or_else(|| anyhow!("url not found datafusion engine!"))? .as_str() .ok_or_else(|| anyhow!("url is not str"))?; @@ -115,6 +112,8 @@ impl DataFusionEngine { let rest_catalog = RestCatalog::new(rest_catalog_config); - Ok(Arc::new(IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?)) + Ok(Arc::new( + IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?, + )) } } diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictests/src/engine/mod.rs similarity index 69% rename from crates/sqllogictest/src/engine/mod.rs rename to crates/sqllogictests/src/engine/mod.rs index 3ae7b6de24..7e31559243 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictests/src/engine/mod.rs @@ -15,23 +15,21 @@ // specific language governing permissions and limitations // under the License. -use anyhow::{anyhow, bail}; +use std::sync::Arc; + +use anyhow::anyhow; pub use datafusion::*; use sqllogictest::{strict_column_validator, AsyncDB, MakeConnection, Runner}; -use std::sync::Arc; use toml::Table; -mod conversion; -mod output; -mod normalize; +pub mod output; mod spark; pub use spark::*; mod datafusion; -pub use datafusion::*; -use crate::error::Result; +use crate::error::Result; #[derive(Clone)] pub enum Engine { @@ -43,42 +41,38 @@ impl Engine { pub async fn new(typ: &str, configs: &Table) -> Result { let configs = Arc::new(configs.clone()); match typ { - "spark" => { - Ok(Engine::SparkSQL(configs)) - } - "datafusion" => { - Ok(Engine::DataFusion(configs)) - } - other => Err(anyhow!("Unknown engine type: {other}").into()) + "spark" => Ok(Engine::SparkSQL(configs)), + "datafusion" => Ok(Engine::DataFusion(configs)), + other => Err(anyhow!("Unknown engine type: {other}").into()), } } pub async fn run_slt_file(self, slt_file: impl Into) -> anyhow::Result<()> { - let absolute_file = format!("{}/testdata/slts/{}", env!("CARGO_MANIFEST_DIR"), slt_file.into()); + let absolute_file = format!( + "{}/testdata/slts/{}", + env!("CARGO_MANIFEST_DIR"), + slt_file.into() + ); match self { Engine::DataFusion(configs) => { let configs = configs.clone(); - let runner = Runner::new(|| async { - DataFusionEngine::new(&*configs).await - }); + let runner = Runner::new(|| async { DataFusionEngine::new(&configs).await }); Self::run_with_runner(runner, absolute_file).await } Engine::SparkSQL(configs) => { let configs = configs.clone(); - let runner = Runner::new(|| async { - SparkSqlEngine::new(&*configs).await - }); + let runner = Runner::new(|| async { SparkSqlEngine::new(&configs).await }); Self::run_with_runner(runner, absolute_file).await } } } - async fn run_with_runner>(mut runner: Runner, - slt_file: String) -> anyhow::Result<()> { + async fn run_with_runner>( + mut runner: Runner, + slt_file: String, + ) -> anyhow::Result<()> { runner.with_column_validator(strict_column_validator); - Ok(runner - .run_file_async(slt_file) - .await?) + Ok(runner.run_file_async(slt_file).await?) } -} \ No newline at end of file +} diff --git a/crates/sqllogictest/src/engine/output.rs b/crates/sqllogictests/src/engine/output.rs similarity index 97% rename from crates/sqllogictest/src/engine/output.rs rename to crates/sqllogictests/src/engine/output.rs index ae1030ca4a..24299856e0 100644 --- a/crates/sqllogictest/src/engine/output.rs +++ b/crates/sqllogictests/src/engine/output.rs @@ -54,4 +54,4 @@ impl ColumnType for DFColumnType { } } -pub(crate) type DFOutput = DBOutput; \ No newline at end of file +pub(crate) type DFOutput = DBOutput; diff --git a/crates/sqllogictest/src/engine/spark.rs b/crates/sqllogictests/src/engine/spark.rs similarity index 82% rename from crates/sqllogictest/src/engine/spark.rs rename to crates/sqllogictests/src/engine/spark.rs index bf94e5db10..47d0ac32ec 100644 --- a/crates/sqllogictest/src/engine/spark.rs +++ b/crates/sqllogictests/src/engine/spark.rs @@ -15,18 +15,19 @@ // specific language governing permissions and limitations // under the License. -use crate::engine::output::DFColumnType; -use crate::engine::{normalize, DataFusionEngine}; +use std::time::Duration; + use anyhow::anyhow; -use itertools::Itertools; +use async_trait::async_trait; use spark_connect_rs::{SparkSession, SparkSessionBuilder}; use sqllogictest::{AsyncDB, DBOutput}; -use std::time::Duration; -use async_trait::async_trait; use toml::Table; -use crate::error::*; -/// SparkSql engine implementation for sqllogictest. +use crate::engine::output::DFColumnType; +use crate::display::normalize_51; +use crate::error::{Error, Result}; + +/// SparkSql engine implementation for sqllogictests. pub struct SparkSqlEngine { session: SparkSession, } @@ -37,15 +38,16 @@ impl AsyncDB for SparkSqlEngine { type ColumnType = DFColumnType; async fn run(&mut self, sql: &str) -> Result> { - let results = self.session + let results = self + .session .sql(sql) .await - .map_err(Box::new)? + .map_err(|e| anyhow!(e))? .collect() .await - .map_err(Box::new)?; - let types = normalize::convert_schema_to_types(results.schema().fields()); - let rows = normalize::convert_batches(results)?; + .map_err(|e| anyhow!(e))?; + let types = normalize_51::convert_schema_to_types(results.schema().fields()); + let rows = normalize_51::convert_batches(vec![results])?; if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) @@ -71,7 +73,8 @@ impl AsyncDB for SparkSqlEngine { impl SparkSqlEngine { pub async fn new(configs: &Table) -> Result { - let url = configs.get("url") + let url = configs + .get("url") .ok_or_else(|| anyhow!("url property doesn't exist for spark engine"))? .as_str() .ok_or_else(|| anyhow!("url property is not a string for spark engine"))?; @@ -79,7 +82,8 @@ impl SparkSqlEngine { let session = SparkSessionBuilder::remote(url) .app_name("SparkConnect") .build() - .await?; + .await + .map_err(|e| anyhow!(e))?; Ok(Self { session }) } diff --git a/crates/sqllogictests/src/error.rs b/crates/sqllogictests/src/error.rs new file mode 100644 index 0000000000..a08f758b13 --- /dev/null +++ b/crates/sqllogictests/src/error.rs @@ -0,0 +1,28 @@ +use std::fmt::{Debug, Display, Formatter}; + +pub struct Error(pub anyhow::Error); +pub type Result = std::result::Result; + +impl Debug for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } +} + +impl From for Error { + fn from(value: anyhow::Error) -> Self { + Self(value) + } +} diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictests/src/lib.rs similarity index 98% rename from crates/sqllogictest/src/lib.rs rename to crates/sqllogictests/src/lib.rs index 40fdc93b9e..de0d213973 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictests/src/lib.rs @@ -18,6 +18,8 @@ // This lib contains codes copied from // [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest) mod engine; -pub mod schedule; mod error; +pub mod schedule; +mod display; + pub use error::*; diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictests/src/schedule.rs similarity index 79% rename from crates/sqllogictest/src/schedule.rs rename to crates/sqllogictests/src/schedule.rs index 1fd627d252..eceb5d9b77 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictests/src/schedule.rs @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::engine::Engine; -use anyhow::anyhow; -use itertools::Itertools; use std::collections::HashMap; use std::fs::read_to_string; use std::path::Path; + +use anyhow::anyhow; +use itertools::Itertools; use toml::{Table, Value}; +use crate::engine::Engine; + /// Schedule of engines to run tests. pub struct Schedule { /// Map of engine names to engine instances. @@ -41,35 +43,37 @@ pub struct Step { impl Schedule { pub async fn parse>(schedule_def_file: P) -> anyhow::Result { let content = read_to_string(schedule_def_file)?; - let toml_value = content.parse::()?.as_table() + let toml_value = content.parse::()?; + let toml_table = toml_value + .as_table() .ok_or_else(|| anyhow::anyhow!("Schedule file must be a TOML table"))?; - let engines = Schedule::parse_engines(toml_value).await?; - let steps = Schedule::parse_steps(toml_value).await?; + let engines = Schedule::parse_engines(toml_table).await?; + let steps = Schedule::parse_steps(toml_table).await?; - Ok(Self { - engines, - steps - }) + Ok(Self { engines, steps }) } async fn parse_engines(table: &Table) -> anyhow::Result> { - let engines = table.get("engines") + let engines = table + .get("engines") .ok_or_else(|| anyhow::anyhow!("Schedule file must have an 'engines' table"))? .as_table() .ok_or_else(|| anyhow::anyhow!("'engines' must be a table"))?; let mut result = HashMap::new(); for (name, engine_config) in engines { - let engine_configs = engine_config.as_table() + let engine_configs = engine_config + .as_table() .ok_or_else(|| anyhow::anyhow!("Config of engine {name} is not a table"))?; - let typ = engine_configs.get("type") + let typ = engine_configs + .get("type") .ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 'type' field"))? .as_str() .ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a string"))?; - let engine = Engine::build(typ, engine_configs).await?; + let engine = Engine::new(typ, engine_configs).await?; result.insert(name.clone(), engine); } @@ -78,13 +82,13 @@ impl Schedule { } async fn parse_steps(table: &Table) -> anyhow::Result> { - let steps = table.get("steps") + let steps = table + .get("steps") .ok_or_else(|| anyhow!("steps not found"))? .as_array() .ok_or_else(|| anyhow!("steps is not array"))?; - steps.iter().map(Schedule::parse_step) - .try_collect() + steps.iter().map(Schedule::parse_step).try_collect() } fn parse_step(value: &Value) -> anyhow::Result { @@ -92,25 +96,24 @@ impl Schedule { .as_table() .ok_or_else(|| anyhow!("Step must be a table!"))?; - let engine_name = t.get("engine") + let engine_name = t + .get("engine") .ok_or_else(|| anyhow!("Property engine is missing in step"))? .as_str() .ok_or_else(|| anyhow!("Property engine is not a string in step"))? .to_string(); - let sql = t.get("sql") + let sql = t + .get("sql") .ok_or_else(|| anyhow!("Property sql is missing in step"))? .as_str() .ok_or_else(|| anyhow!("Property sqlis not a string in step"))? .to_string(); - Ok(Step { - engine_name, - sql, - }) + Ok(Step { engine_name, sql }) } - pub async fn run(mut self) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result<()> { for step_idx in 0..self.steps.len() { self.run_step(step_idx).await?; } @@ -121,7 +124,9 @@ impl Schedule { async fn run_step(&self, step_index: usize) -> anyhow::Result<()> { let step = &self.steps[step_index]; - let engine = self.engines.get(&step.engine_name) + let engine = self + .engines + .get(&step.engine_name) .ok_or_else(|| anyhow!("Engine {} not found!", step.engine_name))? .clone(); diff --git a/crates/sqllogictest/testdata/docker/docker-compose.yaml b/crates/sqllogictests/testdata/docker/docker-compose.yaml similarity index 100% rename from crates/sqllogictest/testdata/docker/docker-compose.yaml rename to crates/sqllogictests/testdata/docker/docker-compose.yaml diff --git a/crates/sqllogictest/testdata/docker/spark/spark-connect-server.sh b/crates/sqllogictests/testdata/docker/spark/spark-connect-server.sh similarity index 100% rename from crates/sqllogictest/testdata/docker/spark/spark-connect-server.sh rename to crates/sqllogictests/testdata/docker/spark/spark-connect-server.sh diff --git a/crates/sqllogictest/testdata/schedules/demo.toml b/crates/sqllogictests/testdata/schedules/demo.toml similarity index 100% rename from crates/sqllogictest/testdata/schedules/demo.toml rename to crates/sqllogictests/testdata/schedules/demo.toml diff --git a/crates/sqllogictest/testdata/slts/demo/prepare.slt b/crates/sqllogictests/testdata/slts/demo/prepare.slt similarity index 100% rename from crates/sqllogictest/testdata/slts/demo/prepare.slt rename to crates/sqllogictests/testdata/slts/demo/prepare.slt diff --git a/crates/sqllogictest/testdata/slts/demo/verify.slt b/crates/sqllogictests/testdata/slts/demo/verify.slt similarity index 100% rename from crates/sqllogictest/testdata/slts/demo/verify.slt rename to crates/sqllogictests/testdata/slts/demo/verify.slt diff --git a/crates/sqllogictest/tests/sqllogictests.rs b/crates/sqllogictests/tests/sqllogictests.rs similarity index 74% rename from crates/sqllogictest/tests/sqllogictests.rs rename to crates/sqllogictests/tests/sqllogictests.rs index 6a6939377b..07fb190714 100644 --- a/crates/sqllogictest/tests/sqllogictests.rs +++ b/crates/sqllogictests/tests/sqllogictests.rs @@ -1,9 +1,10 @@ use std::fs; use std::path::PathBuf; + +use iceberg_test_utils::docker::DockerCompose; use libtest_mimic::{Arguments, Trial}; +use sqllogictests::schedule::Schedule; use tokio::runtime::Handle; -use iceberg_test_utils::docker::DockerCompose; -use sqllogictest::schedule::Schedule; fn main() { env_logger::init(); @@ -36,23 +37,34 @@ fn main() { } fn start_docker() -> anyhow::Result { - let docker = DockerCompose::new("sqllogictests", - format!("{}/testdata/docker", env!("CARGO_MANIFEST_DIR"))); + let docker = DockerCompose::new( + "sqllogictests", + format!("{}/testdata/docker", env!("CARGO_MANIFEST_DIR")), + ); docker.run(); Ok(docker) } fn collect_trials(handle: Handle) -> anyhow::Result> { let schedule_files = collect_schedule_files()?; - log::debug!("Found {} schedule files: {}", schedule_files.len(), &schedule_files); + log::debug!( + "Found {} schedule files: {:?}", + schedule_files.len(), + &schedule_files + ); let mut trials = Vec::with_capacity(schedule_files.len()); for schedule_file in schedule_files { let h = handle.clone(); - let trial_name = format!("Test schedule {}", - schedule_file.file_name() - .expect("Schedule file should have a name") - .to_string_lossy()); - let trial = Trial::new(trial_name, move || h.block_on(run_schedule(schedule_file.clone()))); + let trial_name = format!( + "Test schedule {}", + schedule_file + .file_name() + .expect("Schedule file should have a name") + .to_string_lossy() + ); + let trial = Trial::test(trial_name, move || { + Ok(h.block_on(run_schedule(schedule_file.clone()))?) + }); trials.push(trial); } Ok(trials) @@ -75,4 +87,4 @@ async fn run_schedule(schedule_file: PathBuf) -> anyhow::Result<()> { let schedule = Schedule::parse(schedule_file).await?; schedule.run().await?; Ok(()) -} \ No newline at end of file +} From f68250bddda1d866bb989449a168b4f888e5d228 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 13 Sep 2024 16:27:05 +0800 Subject: [PATCH 06/10] Make formatter happy --- crates/sqllogictests/src/display/conversion_51.rs | 1 - crates/sqllogictests/src/display/normalize.rs | 1 + crates/sqllogictests/src/display/normalize_51.rs | 4 ++-- crates/sqllogictests/src/engine/spark.rs | 2 +- crates/sqllogictests/src/lib.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/sqllogictests/src/display/conversion_51.rs b/crates/sqllogictests/src/display/conversion_51.rs index c5ef69c884..b1243e2af7 100644 --- a/crates/sqllogictests/src/display/conversion_51.rs +++ b/crates/sqllogictests/src/display/conversion_51.rs @@ -4,7 +4,6 @@ use bigdecimal::BigDecimal; use half::f16; use rust_decimal::prelude::*; - /// Represents a constant for NULL string in your database. pub const NULL_STR: &str = "NULL"; diff --git a/crates/sqllogictests/src/display/normalize.rs b/crates/sqllogictests/src/display/normalize.rs index 2a1ffd9384..5e975ef375 100644 --- a/crates/sqllogictests/src/display/normalize.rs +++ b/crates/sqllogictests/src/display/normalize.rs @@ -23,6 +23,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, Fields}; use arrow::util::display::ArrayFormatter; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; + use crate::display::conversion::*; use crate::engine::output::DFColumnType; diff --git a/crates/sqllogictests/src/display/normalize_51.rs b/crates/sqllogictests/src/display/normalize_51.rs index dd620c23c8..f48744f22f 100644 --- a/crates/sqllogictests/src/display/normalize_51.rs +++ b/crates/sqllogictests/src/display/normalize_51.rs @@ -16,13 +16,13 @@ // under the License. use anyhow::anyhow; -use arrow_51::util::display::{DurationFormat, FormatOptions}; use arrow_51::array::{ ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray, }; use arrow_51::datatypes::{DataType, Fields}; -use arrow_51::util::display::ArrayFormatter; +use arrow_51::util::display::{ArrayFormatter, DurationFormat, FormatOptions}; + use crate::display::conversion_51::*; use crate::engine::output::DFColumnType; diff --git a/crates/sqllogictests/src/engine/spark.rs b/crates/sqllogictests/src/engine/spark.rs index 47d0ac32ec..f180219ce1 100644 --- a/crates/sqllogictests/src/engine/spark.rs +++ b/crates/sqllogictests/src/engine/spark.rs @@ -23,8 +23,8 @@ use spark_connect_rs::{SparkSession, SparkSessionBuilder}; use sqllogictest::{AsyncDB, DBOutput}; use toml::Table; -use crate::engine::output::DFColumnType; use crate::display::normalize_51; +use crate::engine::output::DFColumnType; use crate::error::{Error, Result}; /// SparkSql engine implementation for sqllogictests. diff --git a/crates/sqllogictests/src/lib.rs b/crates/sqllogictests/src/lib.rs index de0d213973..d907257ca9 100644 --- a/crates/sqllogictests/src/lib.rs +++ b/crates/sqllogictests/src/lib.rs @@ -17,9 +17,9 @@ // This lib contains codes copied from // [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest) +mod display; mod engine; mod error; pub mod schedule; -mod display; pub use error::*; From 96266795fb12daf940cca69a6961b271fca3b023 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 13 Sep 2024 16:30:23 +0800 Subject: [PATCH 07/10] Make formatter happy --- Cargo.toml | 2 +- crates/sqllogictests/Cargo.toml | 49 ++++++++++++++++----------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ef5e101b0..1deb4e806c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,8 @@ members = [ "crates/examples", "crates/iceberg", "crates/integrations/*", - "crates/test_utils", "crates/sqllogictests", + "crates/test_utils", ] exclude = ["bindings/python"] diff --git a/crates/sqllogictests/Cargo.toml b/crates/sqllogictests/Cargo.toml index b481125e53..f933d3ed97 100644 --- a/crates/sqllogictests/Cargo.toml +++ b/crates/sqllogictests/Cargo.toml @@ -1,43 +1,42 @@ [package] name = "sqllogictests" -version.workspace = true -edition.workspace = true -homepage.workspace = true -repository.workspace = true -license.workspace = true -rust-version.workspace = true +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +rust-version = { workspace = true } [dependencies] +anyhow = { workspace = true } arrow = { workspace = true } # For spark-connect-rs -arrow_51 = { version = "51", package = "arrow"} +arrow_51 = { version = "51", package = "arrow" } async-trait = { workspace = true } -sqllogictest = "0.22" -datafusion = { workspace = true, default-features = true} -datafusion-common = { workspace = true, default-features = true} -thiserror = "1.0.63" -sqlparser = {workspace = true} -itertools = "0.13.0" -half = "2.4.1" bigdecimal = "0.4.1" -rust_decimal = { version = "1.27.0" } -tempfile = { workspace = true } +datafusion = { workspace = true, default-features = true } +datafusion-common = { workspace = true, default-features = true } +env_logger = { workspace = true } +half = "2.4.1" +iceberg-catalog-rest = { path = "../catalog/rest" } +iceberg-datafusion = { path = "../integrations/datafusion" } +itertools = "0.13.0" log = "0.4.22" +rust_decimal = { version = "1.27.0" } spark-connect-rs = "0.0.1-beta.5" -anyhow = {workspace = true} -toml = "0.8.19" -url = {workspace = true} -iceberg-datafusion = { path = "../integrations/datafusion" } -iceberg-catalog-rest = { path = "../catalog/rest" } +sqllogictest = "0.22" +sqlparser = { workspace = true } +tempfile = { workspace = true } +thiserror = "1.0.63" tokio = "1.38.0" -env_logger = { workspace = true } +toml = "0.8.19" +url = { workspace = true } [dev-dependencies] -libtest-mimic = "0.7.3" iceberg_test_utils = { path = "../test_utils", features = ["tests"] } - +libtest-mimic = "0.7.3" [[test]] harness = false name = "sqllogictests" -path = "tests/sqllogictests.rs" \ No newline at end of file +path = "tests/sqllogictests.rs" From 1bcafde4250c673a1635f722f29111607bdd1cf3 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 13 Sep 2024 16:31:58 +0800 Subject: [PATCH 08/10] Make linter happy --- crates/sqllogictests/Cargo.toml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/sqllogictests/Cargo.toml b/crates/sqllogictests/Cargo.toml index f933d3ed97..77e46354fd 100644 --- a/crates/sqllogictests/Cargo.toml +++ b/crates/sqllogictests/Cargo.toml @@ -25,12 +25,8 @@ log = "0.4.22" rust_decimal = { version = "1.27.0" } spark-connect-rs = "0.0.1-beta.5" sqllogictest = "0.22" -sqlparser = { workspace = true } -tempfile = { workspace = true } -thiserror = "1.0.63" tokio = "1.38.0" toml = "0.8.19" -url = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../test_utils", features = ["tests"] } From c2efe599cee6fa2c38ddb5057bb1ee1f1cdfc141 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 13 Sep 2024 17:07:57 +0800 Subject: [PATCH 09/10] Partial --- .../testdata/docker/docker-compose.yaml | 4 +-- .../testdata/schedules/demo.toml | 6 ++--- .../testdata/slts/demo/prepare.slt | 2 +- .../testdata/slts/demo/verify.slt | 2 +- crates/test_utils/src/docker.rs | 26 +++++++++---------- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/crates/sqllogictests/testdata/docker/docker-compose.yaml b/crates/sqllogictests/testdata/docker/docker-compose.yaml index 894b37ac7a..9d24069b4c 100644 --- a/crates/sqllogictests/testdata/docker/docker-compose.yaml +++ b/crates/sqllogictests/testdata/docker/docker-compose.yaml @@ -71,8 +71,8 @@ services: - AWS_REGION=us-east-1 - SPARK_HOME=/opt/spark - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin - - HTTP_PROXY=http://host.docker.internal:7890 - - HTTPS_PROXY=http://host.docker.internal:7890 +# - HTTP_PROXY=http://host.docker.internal:7890 +# - HTTPS_PROXY=http://host.docker.internal:7890 user: root links: - minio:icebergdata.minio diff --git a/crates/sqllogictests/testdata/schedules/demo.toml b/crates/sqllogictests/testdata/schedules/demo.toml index a795627420..ad4773d248 100644 --- a/crates/sqllogictests/testdata/schedules/demo.toml +++ b/crates/sqllogictests/testdata/schedules/demo.toml @@ -1,12 +1,12 @@ [engines] -spark = { type = "spark", url = "sc://localhost:7077" } +spark = { type = "spark", url = "sc://localhost:15002" } df = { type = "datafusion", url = "http://localhost:8181" } [[steps]] engine = "spark" -filename = "prepare.slt" +sql = "demo/prepare.slt" [[steps]] engine = "df" -filename = "verify.sql" +sql = "demo/verify.slt" diff --git a/crates/sqllogictests/testdata/slts/demo/prepare.slt b/crates/sqllogictests/testdata/slts/demo/prepare.slt index 4b0fd89860..c7fb278d56 100644 --- a/crates/sqllogictests/testdata/slts/demo/prepare.slt +++ b/crates/sqllogictests/testdata/slts/demo/prepare.slt @@ -1,5 +1,5 @@ statement ok -CREATE SCHEMA s1 IF NOT EXISTS; +CREATE SCHEMA IF NOT EXISTS s1 ; statement ok USE SCHEMA s1; diff --git a/crates/sqllogictests/testdata/slts/demo/verify.slt b/crates/sqllogictests/testdata/slts/demo/verify.slt index 7c932222b8..fd762d2480 100644 --- a/crates/sqllogictests/testdata/slts/demo/verify.slt +++ b/crates/sqllogictests/testdata/slts/demo/verify.slt @@ -1,5 +1,5 @@ query II rowsort -SELECT * FROM s1.t1; +SELECT * FROM demo.s1.t1; ---- 1 2 diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs index bde9737b17..6d2de35777 100644 --- a/crates/test_utils/src/docker.rs +++ b/crates/test_utils/src/docker.rs @@ -102,13 +102,11 @@ impl DockerCompose { let ip_result = get_cmd_output(cmd, format!("Get container ip of {container_name}")) .trim() .parse::(); - match ip_result { - Ok(ip) => ip, - Err(e) => { - log::error!("Invalid IP, {e}"); - panic!("Failed to parse IP for {container_name}") - } - } + + ip_result.unwrap_or_else(|e| { + log::error!("Invalid IP, {e}"); + panic!("Failed to parse IP for {container_name}") + }) } } @@ -126,12 +124,12 @@ impl Drop for DockerCompose { "--remove-orphans", ]); - run_command( - cmd, - format!( - "Stopping docker compose in {}, project name: {}", - self.docker_compose_dir, self.project_name - ), - ) + // run_command( + // cmd, + // format!( + // "Stopping docker compose in {}, project name: {}", + // self.docker_compose_dir, self.project_name + // ), + // ) } } From 54c746d91f9bd24fe794b5a6789a333f4a7d949b Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 13 Sep 2024 18:09:35 +0800 Subject: [PATCH 10/10] Works --- crates/sqllogictests/src/engine/datafusion.rs | 7 +++++++ crates/sqllogictests/testdata/slts/demo/verify.slt | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/sqllogictests/src/engine/datafusion.rs b/crates/sqllogictests/src/engine/datafusion.rs index 46cec9166b..19830fdc3b 100644 --- a/crates/sqllogictests/src/engine/datafusion.rs +++ b/crates/sqllogictests/src/engine/datafusion.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -108,6 +109,12 @@ impl DataFusionEngine { let rest_catalog_config = RestCatalogConfig::builder() .uri(rest_catalog_url.to_string()) + .props(HashMap::from([ + ("s3.endpoint".to_string(), "http://localhost:9000".to_string()), + ("s3.access-key-id".to_string(), "admin".to_string()), + ("s3.secret-access-key".to_string(), "password".to_string()), + ("s3.region".to_string(), "us-east-1".to_string()), + ])) .build(); let rest_catalog = RestCatalog::new(rest_catalog_config); diff --git a/crates/sqllogictests/testdata/slts/demo/verify.slt b/crates/sqllogictests/testdata/slts/demo/verify.slt index fd762d2480..db0b1420b1 100644 --- a/crates/sqllogictests/testdata/slts/demo/verify.slt +++ b/crates/sqllogictests/testdata/slts/demo/verify.slt @@ -1,4 +1,4 @@ -query II rowsort +query I rowsort SELECT * FROM demo.s1.t1; ---- 1