From a3fcbbc942c712cc4a78141ee4b532e7246e0545 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 31 Oct 2024 17:26:07 -0700 Subject: [PATCH 1/3] do not trigger job --- .../scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala index 8c19ea5c252..2aa3d7e4bc6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -48,7 +48,7 @@ object KyuubiSparkUtil extends Logging { interruptOnCancel = true) debug(s"Execute initialization sql: $sql") try { - spark.sql(sql).isEmpty + spark.sql(sql).take(1).isEmpty } finally { spark.sparkContext.clearJobGroup() } From 7b67d5d2ab3855716b95c8c507e3a6773854f3a7 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 31 Oct 2024 18:06:08 -0700 Subject: [PATCH 2/3] fix ut --- .../org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index 1013522092b..dd05dbb1e34 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION trait WithSparkSQLEngine extends KyuubiFunSuite { protected var spark: SparkSession = _ @@ -35,7 +34,9 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { // Behavior is affected by the initialization SQL: 'SHOW DATABASES' // SPARK-35378 (3.2.0) makes it triggers job // SPARK-43124 (4.0.0) makes it avoid triggering job - protected val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "4.0") 0 else 1 + // KYUUBI #6789 makes it avoid triggering job + // protected val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "4.0") 0 else 1 + protected val initJobId: Int = 0 override def beforeAll(): Unit = { startSparkEngine() From da4df405513196b9f36f10137d6d911f2d3549f6 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 31 Oct 2024 18:55:43 -0700 Subject: [PATCH 3/3] [SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally --- .../apache/kyuubi/engine/spark/KyuubiSparkUtil.scala | 3 ++- .../apache/spark/sql/kyuubi/SparkDatasetHelper.scala | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala index 2aa3d7e4bc6..36eadd16c8a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -24,6 +24,7 @@ import scala.annotation.meta.getter import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.kyuubi.SparkDatasetHelper import org.apache.spark.util.kvstore.KVIndex import org.apache.kyuubi.Logging @@ -48,7 +49,7 @@ object KyuubiSparkUtil extends Logging { interruptOnCancel = true) debug(s"Execute initialization sql: $sql") try { - spark.sql(sql).take(1).isEmpty + SparkDatasetHelper.commandResultOptimized(spark.sql(sql)).isEmpty } finally { spark.sparkContext.clearJobGroup() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index dda7bb4d0ae..62166f1f0f6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit +import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, GlobalLimit, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec @@ -290,4 +290,13 @@ object SparkDatasetHelper extends Logging { nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" || nodeName == "org.apache.spark.sql.execution.CommandResultExec" } + + /** SPARK-47270: Returns a optimized plan for CommandResult, convert to `LocalRelation`. */ + def commandResultOptimized[T](dataset: Dataset[T]): Dataset[T] = { + dataset.logicalPlan match { + case c: CommandResult => + Dataset(dataset.sparkSession, LocalRelation(c.output, c.rows))(dataset.encoder) + case _ => dataset + } + } }