Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,46 @@ case class RDDScanExec(

override def getStream: Option[SparkDataStream] = stream
}

/**
* A physical plan node for `OneRowRelation` for scans with no 'FROM' clause.
*
* We do not extend `RDDScanExec` in order to avoid complexity due to `TreeNode.makeCopy` and
* `TreeNode`'s general use of reflection.
*/
case class OneRowRelationExec() extends LeafExecNode
with InputRDDCodegen {

override val nodeName: String = s"Scan OneRowRelation"

override val output: Seq[Attribute] = Nil

private val rdd: RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
session
.sparkContext
.parallelize(Seq(""), 1)
.mapPartitionsInternal { _ =>
val proj = UnsafeProjection.create(Seq.empty[Expression])
Iterator(proj.apply(InternalRow.empty)).map { r =>
numOutputRows += 1
r
}
}
}

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = rdd

override def simpleString(maxFields: Int): String = s"$nodeName[]"

override def inputRDD: RDD[InternalRow] = rdd

override protected val createUnsafeProjection: Boolean = false

override protected def doCanonicalize(): SparkPlan = {
super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

protected lazy val singleRowRdd = session.sparkContext.parallelize(Seq(InternalRow()), 1)

object InMemoryScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
Expand Down Expand Up @@ -1054,7 +1052,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
generator, g.requiredChildOutput, outer,
g.qualifiedGeneratorOutput, planLater(child)) :: Nil
case _: logical.OneRowRelation =>
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
execution.OneRowRelationExec() :: Nil
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ trait CodegenSupport extends SparkPlan {
case _: SortMergeJoinExec => "smj"
case _: BroadcastNestedLoopJoinExec => "bnlj"
case _: RDDScanExec => "rdd"
case _: OneRowRelationExec => "orr"
case _: DataSourceScanExec => "scan"
case _: InMemoryTableScanExec => "memoryScan"
case _: WholeStageCodegenExec => "wholestagecodegen"
Expand Down
14 changes: 13 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedCo
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression, Sort}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.{CommandResultExec, UnionExec}
import org.apache.spark.sql.execution.{CommandResultExec, OneRowRelationExec, UnionExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
Expand Down Expand Up @@ -4962,6 +4962,18 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
parameters = Map("plan" -> "'Aggregate [groupingsets(Vector(0), posexplode(array(col)))]")
)
}

Seq(true, false).foreach { codegenEnabled =>
test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) {
val df = spark.sql("select 'test' stringCol")
checkAnswer(df, Row("test"))
val plan = df.queryExecution.executedPlan
val oneRowRelationExists = plan.find(_.isInstanceOf[OneRowRelationExec]).isDefined
assert(oneRowRelationExists)
}
}
}
}

case class Foo(bar: Option[String])