Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.DataType
Expand All @@ -31,8 +30,6 @@ import org.apache.spark.sql.types.DataType
*/
private[sql] trait ColumnarBatchScan extends CodegenSupport {

val inMemoryTableScan: InMemoryTableScanExec = null

def vectorTypes: Option[Seq[String]] = None

override lazy val metrics = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp

object WholeStageCodegenExec {
val PIPELINE_DURATION_METRIC = "duration"

private def numOfNestedFields(dataType: DataType): Int = dataType match {
case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
case a: ArrayType => numOfNestedFields(a.elementType)
case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
case _ => 1
}

def isTooManyFields(conf: SQLConf, dataType: DataType): Boolean = {
numOfNestedFields(dataType) > conf.wholeStageMaxNumFields
}
}

/**
Expand Down Expand Up @@ -490,22 +502,14 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case _ => true
}

private def numOfNestedFields(dataType: DataType): Int = dataType match {
case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
case a: ArrayType => numOfNestedFields(a.elementType)
case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
case _ => 1
}

private def supportCodegen(plan: SparkPlan): Boolean = plan match {
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
val hasTooManyOutputFields =
numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
WholeStageCodegenExec.isTooManyFields(conf, plan.schema)
val hasTooManyInputFields =
plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
plan.children.exists(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema))
!willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,12 @@ private[sql] object ColumnAccessor {
throw new RuntimeException("Not support non-primitive type now")
}
}

def decompress(
array: Array[Byte], columnVector: WritableColumnVector, dataType: DataType, numRows: Int):
Unit = {
val byteBuffer = ByteBuffer.wrap(array)
val columnAccessor = ColumnAccessor(dataType, byteBuffer)
decompress(columnAccessor, columnVector, numRows)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,66 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.vectorized._
import org.apache.spark.sql.types._


case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
extends LeafExecNode {
extends LeafExecNode with ColumnarBatchScan {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override def vectorTypes: Option[Seq[String]] =
Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))

/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
* If false, get data from UnsafeRow build from ColumnVector
*/
override val supportCodegen: Boolean = {
// In the initial implementation, for ease of review
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
relation.schema.fields.forall(f => f.dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType => true
case _ => false
}) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema)
}

private val columnIndices =
attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray

private val relationSchema = relation.schema.toArray

private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i)))

private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the OnHeapColumnVector for the cached batches? It's a little inefficient to create one column vector for each cached batch.

Copy link
Member Author

@kiszk kiszk Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can improve efficiency if we can reuse the OnHeapColumnVector.

I think that it is not easy to reuse the OnHeapColumnVector between different cached batches.
IIUC there is no point to know a cached batch will not be referenced. We rely the management of the lifetime on GC by creating OnHeapColumnVector every time for each cached batch.
If we reuse the OnHeapColumnVector (i.e. keep a reference to OnHeapColumnVector), GC will not dispose OnHeapColumnVector even if the generated code will not use the OnHeapColumnVector. It means that uncompressed (huge) data would be alive for a long time. If we know the point where a cache batch will not be referenced, we could set null to data in OnHeapColumnVector.

Thus, I currently create OnHeapColumnVector for each cached batch. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use TaskContext.addTaskCompletionListener?

Copy link
Member Author

@kiszk kiszk Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. We could accomplish to set null into a data field (e.g. intData) in OnHeapColumnVector.intData by registering clear() method to TaskContext.addTaskCompletionListener.

In that case, I realized that we would have to reallocate a large array for a data field in OnHeapColumnVector.intData each time. Do we still need to take care of efficiency of allocating OnHeapColumnVector whose size is relatively smaller than the size of the large array?

If it still makes sense, I will try to implement clear() method and reallocating a large array that may introduce new API into OnHeapColumnVector.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok maybe leave it as a followup for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let us make a follow-up PR in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we use OnHeapColumnVector instead of OffHeapColumnVector?

Copy link
Member Author

@kiszk kiszk Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both can be used. I follow this default configuration since it is not easy to get tungstenMemoryMode here.

val columnarBatch = new ColumnarBatch(
columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount)
columnarBatch.setNumRows(rowCount)

for (i <- 0 until attributes.length) {
ColumnAccessor.decompress(
cachedColumnarBatch.buffers(columnIndices(i)),
columnarBatch.column(i).asInstanceOf[WritableColumnVector],
columnarBatchSchema.fields(i).dataType, rowCount)
}
columnarBatch
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
assert(supportCodegen)
val buffers = relation.cachedColumnBuffers
// HACK ALERT: This is actually an RDD[ColumnarBatch].
// We're taking advantage of Scala's type erasure here to pass these batches along.
Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]])
}

override def output: Seq[Attribute] = attributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,40 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
val df = spark.createDataFrame(data, schema)
assert(df.select("b").first() === Row(outerStruct))
}

test("primitive data type accesses in persist data") {
val data = Seq(true, 1.toByte, 3.toShort, 7, 15.toLong,
31.25.toFloat, 63.75, null)
val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, IntegerType)
val schemas = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, true)
}
val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(data)))
val df = spark.createDataFrame(rdd, StructType(schemas))
val row = df.persist.take(1).apply(0)
checkAnswer(df, row)
}

test("access cache multiple times") {
val df0 = sparkContext.parallelize(Seq(1, 2, 3), 1).toDF("x").cache
df0.count
val df1 = df0.filter("x > 1")
checkAnswer(df1, Seq(Row(2), Row(3)))
val df2 = df0.filter("x > 2")
checkAnswer(df2, Row(3))

val df10 = sparkContext.parallelize(Seq(3, 4, 5, 6), 1).toDF("x").cache
for (_ <- 0 to 2) {
val df11 = df10.filter("x > 5")
checkAnswer(df11, Row(6))
}
}

test("access only some column of the all of columns") {
val df = spark.range(1, 10).map(i => (i, (i + 1).toDouble)).toDF("l", "d")
df.cache
df.count
assert(df.filter("d < 3").count == 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.expressions.scalalang.typed
Expand Down Expand Up @@ -117,6 +118,37 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
}

test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") {
import testImplicits._

val dsInt = spark.range(3).cache
dsInt.count
val dsIntFilter = dsInt.filter(_ > 0)
val planInt = dsIntFilter.queryExecution.executedPlan
assert(planInt.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child
.isInstanceOf[InMemoryTableScanExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child
.asInstanceOf[InMemoryTableScanExec].supportCodegen).isDefined
)
assert(dsIntFilter.collect() === Array(1, 2))

// cache for string type is not supported for InMemoryTableScanExec
val dsString = spark.range(3).map(_.toString).cache
dsString.count
val dsStringFilter = dsString.filter(_ == "1")
val planString = dsStringFilter.queryExecution.executedPlan
assert(planString.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] &&
!p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child
.isInstanceOf[InMemoryTableScanExec]).isDefined
)
assert(dsStringFilter.collect() === Array("1"))
}

test("SPARK-19512 codegen for comparing structs is incorrect") {
// this would raise CompileException before the fix
spark.range(10)
Expand Down