From e430e24baf23005e7612cb2cc1b3dafbc29d4aae Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 24 Apr 2025 15:18:24 +0800 Subject: [PATCH 1/5] Introduce scheduler for sqllogictests --- Cargo.lock | 2 + crates/sqllogictest/Cargo.toml | 2 + crates/sqllogictest/src/engine/datafusion.rs | 24 ++--- crates/sqllogictest/src/engine/mod.rs | 35 ++++++- crates/sqllogictest/src/lib.rs | 2 + crates/sqllogictest/src/schedule.rs | 105 +++++++++++++++++++ 6 files changed, 156 insertions(+), 14 deletions(-) create mode 100644 crates/sqllogictest/src/schedule.rs diff --git a/Cargo.lock b/Cargo.lock index 371ddb65d3..eaa142107b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3727,8 +3727,10 @@ dependencies = [ "datafusion-sqllogictest", "enum-ordinalize", "indicatif", + "serde", "sqllogictest", "toml", + "tracing", ] [[package]] diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index ba149daeab..d07d5c3e87 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -33,6 +33,8 @@ enum-ordinalize = { workspace = true } indicatif = { workspace = true } sqllogictest = { workspace = true } toml = { workspace = true } +serde = { workspace = true } +tracing = {workspace = true} [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index f95cfb247d..49d7273d60 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -26,7 +26,7 @@ use indicatif::ProgressBar; use sqllogictest::runner::AsyncDB; use toml::Table as TomlTable; -use crate::engine::Engine; +use crate::engine::EngineRunner; use crate::error::Result; pub struct DataFusionEngine { @@ -34,17 +34,7 @@ pub struct DataFusionEngine { } #[async_trait::async_trait] -impl Engine for DataFusionEngine { - async fn new(config: TomlTable) -> Result { - let session_config = SessionConfig::new().with_target_partitions(4); - let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config).await?); - - Ok(Self { - datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)), - }) - } - +impl EngineRunner for DataFusionEngine { async fn run_slt_file(&mut self, path: &Path) -> Result<()> { let content = std::fs::read_to_string(path) .with_context(|| format!("Failed to read slt file {:?}", path)) @@ -61,6 +51,16 @@ impl Engine for DataFusionEngine { } impl DataFusionEngine { + pub async fn new(config: TomlTable) -> Result { + let session_config = SessionConfig::new().with_target_partitions(4); + let ctx = SessionContext::new_with_config(session_config); + ctx.register_catalog("default", Self::create_catalog(&config).await?); + + Ok(Self { + datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)), + }) + } + async fn create_catalog(_: &TomlTable) -> anyhow::Result> { todo!() } diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 61722f663f..0d576223e3 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -21,10 +21,41 @@ use std::path::Path; use toml::Table as TomlTable; +use crate::engine::datafusion::DataFusionEngine; use crate::error::Result; +const KEY_TYPE: &str = "type"; +const TYPE_DATAFUSION: &str = "datafusion"; + #[async_trait::async_trait] -pub trait Engine: Sized { - async fn new(config: TomlTable) -> Result; +pub trait EngineRunner: Sized { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } + +pub enum Engine { + DataFusion(DataFusionEngine), +} + +impl Engine { + pub async fn new(config: TomlTable) -> Result { + let engine_type = config + .get(KEY_TYPE) + .ok_or_else(|| anyhow::anyhow!("Missing required key: {KEY_TYPE}"))? + .as_str() + .ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must be a string"))?; + + match engine_type { + TYPE_DATAFUSION => { + let engine = DataFusionEngine::new(config).await?; + Ok(Engine::DataFusion(engine)) + } + _ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()), + } + } + + pub async fn run_slt_file(&mut self, path: &Path) -> Result<()> { + match self { + Engine::DataFusion(engine) => engine.run_slt_file(path).await, + } + } +} diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictest/src/lib.rs index c72d50c429..7b17727d1a 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictest/src/lib.rs @@ -22,3 +22,5 @@ mod engine; #[allow(dead_code)] mod error; +#[allow(dead_code)] +mod schedule; diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs new file mode 100644 index 0000000000..f1dedf9a8a --- /dev/null +++ b/crates/sqllogictest/src/schedule.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use crate::engine::Engine; + +pub struct Schedule { + engines: HashMap, + steps: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Step { + /// Engine name + engine: String, + /// Stl file path + slt: String, +} + +impl Schedule { + pub fn new(engines: HashMap, steps: Vec) -> Self { + Self { engines, steps } + } + + pub async fn run(mut self) -> anyhow::Result<()> { + for (idx, step) in self.steps.iter().enumerate() { + tracing::info!( + "Running step {}/{}, using engine {}, slt file path: {}", + idx + 1, + self.steps.len(), + &step.engine, + &step.slt + ); + + let engine = self + .engines + .get_mut(&step.engine) + .ok_or_else(|| anyhow::anyhow!("Engine {} not found", step.engine))?; + + engine + .run_slt_file(&PathBuf::from(step.slt.clone())) + .await?; + tracing::info!( + "Step {}/{}, engine {}, slt file path: {} finished", + idx + 1, + self.steps.len(), + &step.engine, + &step.slt + ); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use toml::Table as TomlTable; + + use crate::schedule::Step; + + #[test] + fn test_parse_steps() { + let steps = r#" + [[steps]] + engine = "datafusion" + slt = "test.slt" + + [[steps]] + engine = "spark" + slt = "test2.slt" + "#; + + let steps: Vec = toml::from_str::(steps) + .unwrap() + .get("steps") + .unwrap() + .clone() + .try_into() + .unwrap(); + + assert_eq!(steps.len(), 2); + assert_eq!(steps[0].engine, "datafusion"); + assert_eq!(steps[0].slt, "test.slt"); + assert_eq!(steps[1].engine, "spark"); + assert_eq!(steps[1].slt, "test2.slt"); + } +} From 8ff74fefa2fb05b14a9922c16bf3aba32470c906 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 25 Aug 2025 13:50:34 -0700 Subject: [PATCH 2/5] Add schedule definition and parsing in sql logic test --- Cargo.lock | 3 + crates/sqllogictest/Cargo.toml | 7 +- crates/sqllogictest/src/engine/mod.rs | 27 +++++++ crates/sqllogictest/src/schedule.rs | 106 ++++++++++++++++++++++---- 4 files changed, 128 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eaa142107b..508e180d16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3728,7 +3728,10 @@ dependencies = [ "enum-ordinalize", "indicatif", "serde", + "serde_json", "sqllogictest", + "tempfile", + "tokio", "toml", "tracing", ] diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index d07d5c3e87..7eb27d4ca0 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -34,7 +34,12 @@ indicatif = { workspace = true } sqllogictest = { workspace = true } toml = { workspace = true } serde = { workspace = true } -tracing = {workspace = true} +tracing = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +serde_json = { workspace = true } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 0d576223e3..a1d34dd9bc 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -59,3 +59,30 @@ impl Engine { } } } + +#[cfg(test)] +mod tests { + use toml::Table as TomlTable; + + use crate::engine::Engine; + + #[tokio::test] + async fn test_engine_new_missing_type_key() { + let config = TomlTable::new(); + let result = Engine::new(config).await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_engine_invalid_type() { + let input = r#" + [engines] + random = { type = "random_engine", url = "http://localhost:8181" } + "#; + let tbl = toml::from_str(input).unwrap(); + let result = Engine::new(tbl).await; + + assert!(result.is_err()); + } +} diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index f1dedf9a8a..aeaabef8ac 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -16,18 +16,24 @@ // under the License. use std::collections::HashMap; -use std::path::PathBuf; +use std::fs::read_to_string; +use std::path::{Path, PathBuf}; +use anyhow::{Context, anyhow}; use serde::{Deserialize, Serialize}; +use toml::{Table as TomlTable, Value}; +use tracing::info; use crate::engine::Engine; pub struct Schedule { + /// Engine names to engine instances engines: HashMap, + /// List of test steps to run steps: Vec, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Step { /// Engine name engine: String, @@ -40,9 +46,22 @@ impl Schedule { Self { engines, steps } } + pub async fn from_file>(path: P) -> anyhow::Result { + let content = read_to_string(path)?; + let toml_value = content.parse::()?; + let toml_table = toml_value + .as_table() + .ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?; + + let engines = Schedule::parse_engines(toml_table).await?; + let steps = Schedule::parse_steps(toml_table)?; + + Ok(Self::new(engines, steps)) + } + pub async fn run(mut self) -> anyhow::Result<()> { for (idx, step) in self.steps.iter().enumerate() { - tracing::info!( + info!( "Running step {}/{}, using engine {}, slt file path: {}", idx + 1, self.steps.len(), @@ -53,12 +72,12 @@ impl Schedule { let engine = self .engines .get_mut(&step.engine) - .ok_or_else(|| anyhow::anyhow!("Engine {} not found", step.engine))?; + .ok_or_else(|| anyhow!("Engine {} not found", step.engine))?; engine .run_slt_file(&PathBuf::from(step.slt.clone())) .await?; - tracing::info!( + info!( "Step {}/{}, engine {}, slt file path: {} finished", idx + 1, self.steps.len(), @@ -68,17 +87,57 @@ impl Schedule { } Ok(()) } + + async fn parse_engines(table: &TomlTable) -> anyhow::Result> { + let engines_tbl = table + .get("engines") + .with_context(|| "Schedule file must have an 'engines' table")? + .as_table() + .ok_or_else(|| anyhow!("'engines' must be a table"))?; + + let mut engines = HashMap::new(); + + for (name, engine_val) in engines_tbl { + let cfg_tbl = engine_val + .as_table() + .ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))? + .clone(); + + let engine = Engine::new(cfg_tbl) + .await + .with_context(|| format!("Failed to construct engine '{name}'"))?; + + if engines.insert(name.clone(), engine).is_some() { + return Err(anyhow!("Duplicate engine '{name}'")); + } + } + + Ok(engines) + } + + fn parse_steps(table: &TomlTable) -> anyhow::Result> { + let steps_val = table + .get("steps") + .with_context(|| "Schedule file must have a 'steps' array")?; + + let steps: Vec = steps_val + .clone() + .try_into() + .with_context(|| "Failed to deserialize steps")?; + + Ok(steps) + } } #[cfg(test)] mod tests { use toml::Table as TomlTable; - use crate::schedule::Step; + use crate::schedule::Schedule; #[test] fn test_parse_steps() { - let steps = r#" + let input = r#" [[steps]] engine = "datafusion" slt = "test.slt" @@ -88,13 +147,8 @@ mod tests { slt = "test2.slt" "#; - let steps: Vec = toml::from_str::(steps) - .unwrap() - .get("steps") - .unwrap() - .clone() - .try_into() - .unwrap(); + let tbl: TomlTable = toml::from_str(input).unwrap(); + let steps = Schedule::parse_steps(&tbl).unwrap(); assert_eq!(steps.len(), 2); assert_eq!(steps[0].engine, "datafusion"); @@ -102,4 +156,28 @@ mod tests { assert_eq!(steps[1].engine, "spark"); assert_eq!(steps[1].slt, "test2.slt"); } + + #[test] + fn test_parse_steps_empty() { + let input = r#" + [[steps]] + "#; + + let tbl: TomlTable = toml::from_str(input).unwrap(); + let steps = Schedule::parse_steps(&tbl); + + assert!(steps.is_err()); + } + + #[tokio::test] + async fn test_parse_engines_invalid_table() { + let toml_content = r#" + engines = "not_a_table" + "#; + + let table: TomlTable = toml::from_str(toml_content).unwrap(); + let result = Schedule::parse_engines(&table).await; + + assert!(result.is_err()); + } } From c60e4d8bd8f8d414336cdbe507cd9e0f1a229d11 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 25 Aug 2025 13:56:54 -0700 Subject: [PATCH 3/5] Remove unnecessary deps --- Cargo.lock | 2 -- crates/sqllogictest/Cargo.toml | 4 ---- 2 files changed, 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 508e180d16..bdea7e29e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3728,9 +3728,7 @@ dependencies = [ "enum-ordinalize", "indicatif", "serde", - "serde_json", "sqllogictest", - "tempfile", "tokio", "toml", "tracing", diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index 7eb27d4ca0..e335de433e 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -37,10 +37,6 @@ serde = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } -[dev-dependencies] -tempfile = { workspace = true } -serde_json = { workspace = true } - [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version ignored = ["enum-ordinalize"] From 7a5120e2de8719b7c3d3dc8ff99be818393ddf80 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Fri, 29 Aug 2025 00:10:21 +0000 Subject: [PATCH 4/5] Update logging & slt relative path --- crates/sqllogictest/src/schedule.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index aeaabef8ac..06e2bc45f4 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -31,6 +31,8 @@ pub struct Schedule { engines: HashMap, /// List of test steps to run steps: Vec, + /// Path of the schedule file + schedule_file: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -42,11 +44,16 @@ pub struct Step { } impl Schedule { - pub fn new(engines: HashMap, steps: Vec) -> Self { - Self { engines, steps } + pub fn new(engines: HashMap, steps: Vec, schedule_file: String) -> Self { + Self { + engines, + steps, + schedule_file, + } } pub async fn from_file>(path: P) -> anyhow::Result { + let path_str = path.as_ref().to_string_lossy().to_string(); let content = read_to_string(path)?; let toml_value = content.parse::()?; let toml_table = toml_value @@ -56,10 +63,12 @@ impl Schedule { let engines = Schedule::parse_engines(toml_table).await?; let steps = Schedule::parse_steps(toml_table)?; - Ok(Self::new(engines, steps)) + Ok(Self::new(engines, steps, path_str)) } pub async fn run(mut self) -> anyhow::Result<()> { + info!("Starting test run with schedule: {}", self.schedule_file); + for (idx, step) in self.steps.iter().enumerate() { info!( "Running step {}/{}, using engine {}, slt file path: {}", @@ -74,11 +83,16 @@ impl Schedule { .get_mut(&step.engine) .ok_or_else(|| anyhow!("Engine {} not found", step.engine))?; - engine - .run_slt_file(&PathBuf::from(step.slt.clone())) - .await?; + let step_sql_path = PathBuf::from(format!( + "{}/testdata/slts/{}", + env!("CARGO_MANIFEST_DIR"), + &step.slt + )); + + engine.run_slt_file(&PathBuf::from(step_sql_path)).await?; + info!( - "Step {}/{}, engine {}, slt file path: {} finished", + "Completed step {}/{}, engine {}, slt file path: {}", idx + 1, self.steps.len(), &step.engine, From 3591446bde4da40b10e98f202cef21ada01aef3e Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Tue, 2 Sep 2025 03:47:41 +0000 Subject: [PATCH 5/5] Update redundant pathbuf --- crates/sqllogictest/src/schedule.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index 06e2bc45f4..d3284949bc 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -89,7 +89,7 @@ impl Schedule { &step.slt )); - engine.run_slt_file(&PathBuf::from(step_sql_path)).await?; + engine.run_slt_file(&step_sql_path).await?; info!( "Completed step {}/{}, engine {}, slt file path: {}",