Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,7 @@ class Dataset[T] private[sql](
private[sql] def getRows(
numRows: Int,
truncate: Int): Seq[Seq[String]] = {
val newDf = logicalPlan match {
case c: CommandResult =>
// Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to
// avoid triggering a job
Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows))
case _ => toDF()
}
val newDf = commandResultOptimized.toDF()
val castCols = newDf.logicalPlan.output.map { col =>
Column(ToPrettyString(col))
}
Expand Down Expand Up @@ -655,7 +649,8 @@ class Dataset[T] private[sql](
* @group basic
* @since 2.4.0
*/
def isEmpty: Boolean = withAction("isEmpty", select().limit(1).queryExecution) { plan =>
def isEmpty: Boolean = withAction("isEmpty",
commandResultOptimized.select().limit(1).queryExecution) { plan =>
plan.executeTake(1).isEmpty
}

Expand Down Expand Up @@ -4483,6 +4478,17 @@ class Dataset[T] private[sql](
}
}

/** Returns a optimized plan for CommandResult, convert to `LocalRelation`. */
private def commandResultOptimized: Dataset[T] = {
logicalPlan match {
case c: CommandResult =>
// Convert to `LocalRelation` and let `ConvertToLocalRelation` do the casting locally to
// avoid triggering a job
Dataset(sparkSession, LocalRelation(c.output, c.rows))
case _ => this
}
}

/** Convert to an RDD of serialized ArrowRecordBatches. */
private[sql] def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
val schemaCaptured = this.schema
Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,25 @@ class DatasetSuite extends QueryTest
checkDataset(ds.map(t => t),
WithSet(0, HashSet("foo", "bar")), WithSet(1, HashSet("bar", "zoo")))
}

test("SPARK-47270: isEmpty does not trigger job execution on CommandResults") {
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {
withTable("t1") {
sql("create table t1(c int) using parquet")

@volatile var jobCounter = 0
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobCounter += 1
}
}
withListener(spark.sparkContext, listener) { _ =>
sql("show tables").isEmpty
}
assert(jobCounter === 0)
}
}
}
}

class DatasetLargeResultCollectingSuite extends QueryTest
Expand Down