diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 189be1d6a30d2..b0d6dcafa2460 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -271,13 +271,7 @@ class Dataset[T] private[sql]( private[sql] def getRows( numRows: Int, truncate: Int): Seq[Seq[String]] = { - val newDf = logicalPlan match { - case c: CommandResult => - // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to - // avoid triggering a job - Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows)) - case _ => toDF() - } + val newDf = commandResultOptimized.toDF() val castCols = newDf.logicalPlan.output.map { col => Column(ToPrettyString(col)) } @@ -655,7 +649,8 @@ class Dataset[T] private[sql]( * @group basic * @since 2.4.0 */ - def isEmpty: Boolean = withAction("isEmpty", select().limit(1).queryExecution) { plan => + def isEmpty: Boolean = withAction("isEmpty", + commandResultOptimized.select().limit(1).queryExecution) { plan => plan.executeTake(1).isEmpty } @@ -4483,6 +4478,17 @@ class Dataset[T] private[sql]( } } + /** Returns a optimized plan for CommandResult, convert to `LocalRelation`. */ + private def commandResultOptimized: Dataset[T] = { + logicalPlan match { + case c: CommandResult => + // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to + // avoid triggering a job + Dataset(sparkSession, LocalRelation(c.output, c.rows)) + case _ => this + } + } + /** Convert to an RDD of serialized ArrowRecordBatches. */ private[sql] def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = { val schemaCaptured = this.schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index fe295b0cfa26a..c16de6f80307c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2717,6 +2717,25 @@ class DatasetSuite extends QueryTest checkDataset(ds.map(t => t), WithSet(0, HashSet("foo", "bar")), WithSet(1, HashSet("bar", "zoo"))) } + + test("SPARK-47270: isEmpty does not trigger job execution on CommandResults") { + withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") { + withTable("t1") { + sql("create table t1(c int) using parquet") + + @volatile var jobCounter = 0 + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobCounter += 1 + } + } + withListener(spark.sparkContext, listener) { _ => + sql("show tables").isEmpty + } + assert(jobCounter === 0) + } + } + } } class DatasetLargeResultCollectingSuite extends QueryTest