From 25383e6e62113caa87be554dd35d3cd981e9facd Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Mon, 4 Mar 2024 19:58:57 +0800 Subject: [PATCH 1/4] [SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally --- .../scala/org/apache/spark/sql/Dataset.scala | 13 +++++++++++-- .../org/apache/spark/sql/DatasetSuite.scala | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 189be1d6a30d2..508e6d4bd31ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -655,8 +655,17 @@ class Dataset[T] private[sql]( * @group basic * @since 2.4.0 */ - def isEmpty: Boolean = withAction("isEmpty", select().limit(1).queryExecution) { plan => - plan.executeTake(1).isEmpty + def isEmpty: Boolean = { + val newDf = logicalPlan match { + case c: CommandResult => + // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to + // avoid triggering a job + Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows)) + case _ => toDF() + } + withAction("isEmpty", newDf.select().limit(1).queryExecution) { plan => + plan.executeTake(1).isEmpty + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index fe295b0cfa26a..c16de6f80307c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2717,6 +2717,25 @@ class DatasetSuite extends QueryTest checkDataset(ds.map(t => t), WithSet(0, HashSet("foo", "bar")), WithSet(1, HashSet("bar", "zoo"))) } + + test("SPARK-47270: isEmpty does not trigger job execution on CommandResults") { + withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") { + withTable("t1") { + sql("create table t1(c int) using parquet") + + @volatile var jobCounter = 0 + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobCounter += 1 + } + } + withListener(spark.sparkContext, listener) { _ => + sql("show tables").isEmpty + } + assert(jobCounter === 0) + } + } + } } class DatasetLargeResultCollectingSuite extends QueryTest From 012b11f46403602da8a1a2584014828216612b7b Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Mon, 4 Mar 2024 20:10:04 +0800 Subject: [PATCH 2/4] no need toDF --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 508e6d4bd31ce..4762d1269d41e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -656,14 +656,14 @@ class Dataset[T] private[sql]( * @since 2.4.0 */ def isEmpty: Boolean = { - val newDf = logicalPlan match { + val newDs = logicalPlan match { case c: CommandResult => // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to // avoid triggering a job Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows)) - case _ => toDF() + case _ => this } - withAction("isEmpty", newDf.select().limit(1).queryExecution) { plan => + withAction("isEmpty", newDs.select().limit(1).queryExecution) { plan => plan.executeTake(1).isEmpty } } From e74809bb0936d3dd38e7e0e2c1ba83a5c437847a Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Tue, 5 Mar 2024 14:31:55 +0800 Subject: [PATCH 3/4] add withCommandResultOptimized --- .../scala/org/apache/spark/sql/Dataset.scala | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 4762d1269d41e..a5bd14bbe2a2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -271,13 +271,7 @@ class Dataset[T] private[sql]( private[sql] def getRows( numRows: Int, truncate: Int): Seq[Seq[String]] = { - val newDf = logicalPlan match { - case c: CommandResult => - // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to - // avoid triggering a job - Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows)) - case _ => toDF() - } + val newDf = withCommandResultOptimized.toDF() val castCols = newDf.logicalPlan.output.map { col => Column(ToPrettyString(col)) } @@ -655,17 +649,9 @@ class Dataset[T] private[sql]( * @group basic * @since 2.4.0 */ - def isEmpty: Boolean = { - val newDs = logicalPlan match { - case c: CommandResult => - // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to - // avoid triggering a job - Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows)) - case _ => this - } - withAction("isEmpty", newDs.select().limit(1).queryExecution) { plan => - plan.executeTake(1).isEmpty - } + def isEmpty: Boolean = withAction("isEmpty", + withCommandResultOptimized.select().limit(1).queryExecution) { plan => + plan.executeTake(1).isEmpty } /** @@ -4492,6 +4478,17 @@ class Dataset[T] private[sql]( } } + /** Returns a optimized plan for CommandResult, convert to `LocalRelation`. */ + private def withCommandResultOptimized: Dataset[T] = { + logicalPlan match { + case c: CommandResult => + // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to + // avoid triggering a job + Dataset(sparkSession, LocalRelation(c.output, c.rows)) + case _ => this + } + } + /** Convert to an RDD of serialized ArrowRecordBatches. */ private[sql] def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = { val schemaCaptured = this.schema From 6b5210440a8a27601c155f91d17f135dcf96ed0d Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Wed, 6 Mar 2024 09:47:09 +0800 Subject: [PATCH 4/4] address comments --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a5bd14bbe2a2e..b0d6dcafa2460 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -271,7 +271,7 @@ class Dataset[T] private[sql]( private[sql] def getRows( numRows: Int, truncate: Int): Seq[Seq[String]] = { - val newDf = withCommandResultOptimized.toDF() + val newDf = commandResultOptimized.toDF() val castCols = newDf.logicalPlan.output.map { col => Column(ToPrettyString(col)) } @@ -650,7 +650,7 @@ class Dataset[T] private[sql]( * @since 2.4.0 */ def isEmpty: Boolean = withAction("isEmpty", - withCommandResultOptimized.select().limit(1).queryExecution) { plan => + commandResultOptimized.select().limit(1).queryExecution) { plan => plan.executeTake(1).isEmpty } @@ -4479,7 +4479,7 @@ class Dataset[T] private[sql]( } /** Returns a optimized plan for CommandResult, convert to `LocalRelation`. */ - private def withCommandResultOptimized: Dataset[T] = { + private def commandResultOptimized: Dataset[T] = { logicalPlan match { case c: CommandResult => // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to