From 8134e73a3be499fdc7e007e950e704111c8eba15 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 4 Oct 2017 17:46:13 +0100 Subject: [PATCH 01/11] enable code generation for primitive type --- .../sql/execution/ColumnarBatchScan.scala | 32 ++++++++++++--- .../sql/execution/WholeStageCodegenExec.scala | 24 +++++++----- .../execution/columnar/ColumnAccessor.scala | 8 ++++ .../execution/columnar/InMemoryRelation.scala | 39 +++++++++++++++++-- .../columnar/InMemoryTableScanExec.scala | 26 ++++++++++--- .../spark/sql/DataFrameTungstenSuite.scala | 23 +++++++++++ .../execution/WholeStageCodegenSuite.scala | 32 +++++++++++++++ 7 files changed, 160 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 1afe83ea3539e..c511690cead16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, WritableColumnVector} +import org.apache.spark.sql.types.{DataType, DataTypes} /** @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.DataType */ private[sql] trait ColumnarBatchScan extends CodegenSupport { - val inMemoryTableScan: InMemoryTableScanExec = null + val columnIndexes: Array[Int] = null def vectorTypes: Option[Seq[String]] = None @@ -84,25 +84,45 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") + val cachedBatchClz = "org.apache.spark.sql.execution.columnar.CachedBatch" + val cachedBatch = ctx.freshName("cachedBatch") val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnVectorClzs = vectorTypes.getOrElse( Seq.fill(colVars.size)(classOf[ColumnVector].getName)) + val columnAccessorClz = "org.apache.spark.sql.execution.columnar.ColumnAccessor" + val writableColumnVectorClz = classOf[WritableColumnVector].getName + val dataTypesClz = classOf[DataTypes].getName val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { case ((name, columnVectorClz), i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = ($columnVectorClz) $batch.column($i);" + val index = if (columnIndexes == null) i else columnIndexes(i) + s"$name = ($columnVectorClz) $batch.column($index);" + (if (columnIndexes == null) "" else { + val dt = output.attrs(i).dataType + s"\n$columnAccessorClz$$.MODULE$$.decompress(" + + s"$cachedBatch.buffers()[$index], ($writableColumnVectorClz) $name, " + + s"$dataTypesClz.$dt, $cachedBatch.numRows());" + }) } + val assignBatch = if (columnIndexes == null) { + s"$batch = ($columnarBatchClz)$input.next();" + } else { + val inMemoryRelationClz = classOf[InMemoryRelation].getName + s""" + $cachedBatchClz $cachedBatch = ($cachedBatchClz)$input.next(); + $batch = $inMemoryRelationClz$$.MODULE$$.createColumn($cachedBatch); + """ + } val nextBatch = ctx.freshName("nextBatch") val nextBatchFuncName = ctx.addNewFunction(nextBatch, s""" |private void $nextBatch() throws java.io.IOException { | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { - | $batch = ($columnarBatchClz)$input.next(); + | $assignBatch | $numOutputRows.add($batch.numRows()); | $idx = 0; | ${columnAssigns.mkString("", "\n", "\n")} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 1aaaf896692d1..de600c6e085dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -282,6 +282,18 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp object WholeStageCodegenExec { val PIPELINE_DURATION_METRIC = "duration" + + private def numOfNestedFields(dataType: DataType): Int = dataType match { + case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum + case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) + case a: ArrayType => numOfNestedFields(a.elementType) + case u: UserDefinedType[_] => numOfNestedFields(u.sqlType) + case _ => 1 + } + + def isTooManyFields(conf: SQLConf, dataType: DataType): Boolean = { + numOfNestedFields(dataType) > conf.wholeStageMaxNumFields + } } /** @@ -490,22 +502,14 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { case _ => true } - private def numOfNestedFields(dataType: DataType): Int = dataType match { - case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum - case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) - case a: ArrayType => numOfNestedFields(a.elementType) - case u: UserDefinedType[_] => numOfNestedFields(u.sqlType) - case _ => 1 - } - private def supportCodegen(plan: SparkPlan): Boolean = plan match { case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns val hasTooManyOutputFields = - numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields + WholeStageCodegenExec.isTooManyFields(conf, plan.schema) val hasTooManyInputFields = - plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields) + plan.children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isDefined !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala index 24c8ac81420cb..445933d98e9d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -163,4 +163,12 @@ private[sql] object ColumnAccessor { throw new RuntimeException("Not support non-primitive type now") } } + + def decompress( + array: Array[Byte], columnVector: WritableColumnVector, dataType: DataType, numRows: Int): + Unit = { + val byteBuffer = ByteBuffer.wrap(array) + val columnAccessor = ColumnAccessor(dataType, byteBuffer) + decompress(columnAccessor, columnVector, numRows) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index a1c62a729900e..f43a77f594ae7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -26,7 +26,9 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator @@ -39,6 +41,16 @@ object InMemoryRelation { child: SparkPlan, tableName: Option[String]): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() + + def createColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { + val rowCount = cachedColumnarBatch.numRows + val schema = cachedColumnarBatch.schema + val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, schema) + val columnarBatch = new ColumnarBatch( + schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) + columnarBatch.setNumRows(rowCount) + return columnarBatch + } } @@ -48,9 +60,11 @@ object InMemoryRelation { * @param numRows The total number of rows in this batch * @param buffers The buffers for serialized columns * @param stats The stat of columns + * @param schema The schema of columns */ private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +case class CachedBatch( + numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow, schema: StructType) case class InMemoryRelation( output: Seq[Attribute], @@ -63,6 +77,23 @@ case class InMemoryRelation( val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { + /** + * If true, get data from ColumnVector in ColumnarBatch, which are generally faster. + * If false, get data from UnsafeRow build from ColumnVector + */ + private[columnar] val useColumnarBatches: Boolean = { + // In the initial implementation, for ease of review + // support only primitive data types and # of fields is less than wholeStageMaxNumFields + val schema = StructType.fromAttributes(child.output) + schema.fields.find(f => f.dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | + FloatType | DoubleType => false + case _ => true + }).isEmpty && + !WholeStageCodegenExec.isTooManyFields(conf, child.schema) && + children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isEmpty + } + override protected def innerChildren: Seq[SparkPlan] = Seq(child) override def producedAttributes: AttributeSet = outputSet @@ -87,6 +118,7 @@ case class InMemoryRelation( private def buildBuffers(): Unit = { val output = child.output + val useColumnarBatch = useColumnarBatches val cached = child.execute().mapPartitionsInternal { rowIterator => new Iterator[CachedBatch] { def next(): CachedBatch = { @@ -126,7 +158,8 @@ case class InMemoryRelation( columnBuilders.flatMap(_.columnStats.collectedStatistics)) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) - }, stats) + }, stats, + if (useColumnarBatch) StructType.fromAttributes(output) else null) } def hasNext: Boolean = rowIterator.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index af3636a5a2ca7..d37fd905fa9d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.types.UserDefinedType @@ -32,12 +32,28 @@ case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) - extends LeafExecNode { + extends LeafExecNode with ColumnarBatchScan { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def vectorTypes: Option[Seq[String]] = + Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) + + override val columnIndexes = + attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray + + override val supportCodegen: Boolean = relation.useColumnarBatches + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + if (supportCodegen) { + val buffers = relation.cachedColumnBuffers + // HACK ALERT: This is actually an RDD[CachedBatch]. + // We're taking advantage of Scala's type erasure here to pass these batches along. + Seq(buffers.asInstanceOf[RDD[InternalRow]]) + } else { + Seq() + } + } override def output: Seq[Attribute] = attributes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala index fe6ba83b4cbfb..f941a30506c8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala @@ -73,4 +73,27 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext { val df = spark.createDataFrame(data, schema) assert(df.select("b").first() === Row(outerStruct)) } + + test("primitive data type accesses in persist data") { + val data = Seq(true, 1.toByte, 3.toShort, 7, 15.toLong, + 31.25.toFloat, 63.75, null) + val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, IntegerType) + val schemas = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, true) + } + val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(data))) + val df = spark.createDataFrame(rdd, StructType(schemas)) + val row = df.persist.take(1).apply(0) + checkAnswer(df, row) + } + + test("access cache multiple times") { + val df = sparkContext.parallelize(Seq(1, 2, 3), 1).toDF("x").cache + df.count + val df1 = df.filter("x > 1") + checkAnswer(df1, Seq(Row(2), Row(3))) + val df2 = df.filter("x > 2") + checkAnswer(df2, Row(3)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 098e4cfeb15b2..bc05dca578c47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.expressions.scalalang.typed @@ -117,6 +118,37 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } + test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") { + import testImplicits._ + + val dsInt = spark.range(3).cache + dsInt.count + val dsIntFilter = dsInt.filter(_ > 0) + val planInt = dsIntFilter.queryExecution.executedPlan + assert(planInt.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] && + p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child + .isInstanceOf[InMemoryTableScanExec] && + p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child + .asInstanceOf[InMemoryTableScanExec].supportCodegen).isDefined + ) + assert(dsIntFilter.collect() === Array(1, 2)) + + // cache for string type is not supported for InMemoryTableScanExec + val dsString = spark.range(3).map(_.toString).cache + dsString.count + val dsStringFilter = dsString.filter(_ == "1") + val planString = dsStringFilter.queryExecution.executedPlan + assert(planString.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] && + !p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child + .isInstanceOf[InMemoryTableScanExec]).isDefined + ) + assert(dsStringFilter.collect() === Array("1")) + } + test("SPARK-19512 codegen for comparing structs is incorrect") { // this would raise CompileException before the fix spark.range(10) From 8ae2368209b42bdb02a4decc7fe81d17d6a1fb9a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 13 Oct 2017 20:22:47 +0100 Subject: [PATCH 02/11] move creation of ColumnarBatch to InMemoryTableScanExec.inputRDDs --- .../sql/execution/ColumnarBatchScan.scala | 22 ++----------------- .../execution/columnar/InMemoryRelation.scala | 11 ---------- .../columnar/InMemoryTableScanExec.scala | 22 ++++++++++++++++--- 3 files changed, 21 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index c511690cead16..875a22017d2ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -84,7 +84,6 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") - val cachedBatchClz = "org.apache.spark.sql.execution.columnar.CachedBatch" val cachedBatch = ctx.freshName("cachedBatch") val idx = ctx.freshName("batchIdx") @@ -92,37 +91,20 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnVectorClzs = vectorTypes.getOrElse( Seq.fill(colVars.size)(classOf[ColumnVector].getName)) - val columnAccessorClz = "org.apache.spark.sql.execution.columnar.ColumnAccessor" - val writableColumnVectorClz = classOf[WritableColumnVector].getName - val dataTypesClz = classOf[DataTypes].getName val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { case ((name, columnVectorClz), i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") val index = if (columnIndexes == null) i else columnIndexes(i) - s"$name = ($columnVectorClz) $batch.column($index);" + (if (columnIndexes == null) "" else { - val dt = output.attrs(i).dataType - s"\n$columnAccessorClz$$.MODULE$$.decompress(" + - s"$cachedBatch.buffers()[$index], ($writableColumnVectorClz) $name, " + - s"$dataTypesClz.$dt, $cachedBatch.numRows());" - }) + s"$name = ($columnVectorClz) $batch.column($index);" } - val assignBatch = if (columnIndexes == null) { - s"$batch = ($columnarBatchClz)$input.next();" - } else { - val inMemoryRelationClz = classOf[InMemoryRelation].getName - s""" - $cachedBatchClz $cachedBatch = ($cachedBatchClz)$input.next(); - $batch = $inMemoryRelationClz$$.MODULE$$.createColumn($cachedBatch); - """ - } val nextBatch = ctx.freshName("nextBatch") val nextBatchFuncName = ctx.addNewFunction(nextBatch, s""" |private void $nextBatch() throws java.io.IOException { | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { - | $assignBatch + | $batch = ($columnarBatchClz)$input.next(); | $numOutputRows.add($batch.numRows()); | $idx = 0; | ${columnAssigns.mkString("", "\n", "\n")} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index f43a77f594ae7..45e3a3e69c9ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} -import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, OnHeapColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator @@ -41,16 +40,6 @@ object InMemoryRelation { child: SparkPlan, tableName: Option[String]): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() - - def createColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { - val rowCount = cachedColumnarBatch.numRows - val schema = cachedColumnarBatch.schema - val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, schema) - val columnarBatch = new ColumnarBatch( - schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) - columnarBatch.setNumRows(rowCount) - return columnarBatch - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index d37fd905fa9d4..0d12884155b95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} -import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.execution.vectorized._ import org.apache.spark.sql.types.UserDefinedType @@ -44,12 +44,28 @@ case class InMemoryTableScanExec( override val supportCodegen: Boolean = relation.useColumnarBatches + private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { + val rowCount = cachedColumnarBatch.numRows + val schema = cachedColumnarBatch.schema + val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, schema) + val columnarBatch = new ColumnarBatch( + schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) + columnarBatch.setNumRows(rowCount) + + for (i <- 0 until cachedColumnarBatch.buffers.length) { + ColumnAccessor.decompress( + cachedColumnarBatch.buffers(i), columnarBatch.column(i).asInstanceOf[WritableColumnVector], + schema.fields(i).dataType, rowCount) + } + return columnarBatch + } + override def inputRDDs(): Seq[RDD[InternalRow]] = { if (supportCodegen) { val buffers = relation.cachedColumnBuffers - // HACK ALERT: This is actually an RDD[CachedBatch]. + // HACK ALERT: This is actually an RDD[ColumnarBatch]. // We're taking advantage of Scala's type erasure here to pass these batches along. - Seq(buffers.asInstanceOf[RDD[InternalRow]]) + Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]]) } else { Seq() } From 750b2308080c7ad135f231770b2fcab71523cbf4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 13 Oct 2017 20:23:10 +0100 Subject: [PATCH 03/11] add new test cases --- .../spark/sql/DataFrameTungstenSuite.scala | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala index f941a30506c8e..4483a62bd7a6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala @@ -89,11 +89,25 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext { } test("access cache multiple times") { - val df = sparkContext.parallelize(Seq(1, 2, 3), 1).toDF("x").cache - df.count - val df1 = df.filter("x > 1") + val df0 = sparkContext.parallelize(Seq(1, 2, 3), 1).toDF("x").cache + df0.count + val df1 = df0.filter("x > 1") checkAnswer(df1, Seq(Row(2), Row(3))) - val df2 = df.filter("x > 2") + val df2 = df0.filter("x > 2") checkAnswer(df2, Row(3)) + + val df10 = sparkContext.parallelize(Seq(3, 4, 5, 6), 1).toDF("x").cache + for (_ <- 0 to 2) { + val df11 = df10.filter("x > 5") + checkAnswer(df11, Row(6)) + } + } + + test("some columns in table cache are not accessed") { + val df = sparkContext.parallelize( + Seq((1, 1.1), (2, 2.2), (3, 3.3)), 1).toDF("x", "y").cache + df.count + val df1 = df.filter("y > 2.2") + checkAnswer(df1, Row(3, 3.3)) } } From b367a707ba0af72215866f75157f3387f4a17345 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 16 Oct 2017 10:56:48 +0100 Subject: [PATCH 04/11] address review comment --- .../org/apache/spark/sql/execution/ColumnarBatchScan.scala | 5 +---- .../sql/execution/columnar/InMemoryTableScanExec.scala | 7 +++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 875a22017d2ea..588e686d62d66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -31,8 +31,6 @@ import org.apache.spark.sql.types.{DataType, DataTypes} */ private[sql] trait ColumnarBatchScan extends CodegenSupport { - val columnIndexes: Array[Int] = null - def vectorTypes: Option[Seq[String]] = None override lazy val metrics = Map( @@ -94,8 +92,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { case ((name, columnVectorClz), i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") - val index = if (columnIndexes == null) i else columnIndexes(i) - s"$name = ($columnVectorClz) $batch.column($index);" + s"$name = ($columnVectorClz) $batch.column($i);" } val nextBatch = ctx.freshName("nextBatch") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 0d12884155b95..ebf176d033249 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -39,7 +39,7 @@ case class InMemoryTableScanExec( override def vectorTypes: Option[Seq[String]] = Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) - override val columnIndexes = + val columnIndices = attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray override val supportCodegen: Boolean = relation.useColumnarBatches @@ -53,8 +53,11 @@ case class InMemoryTableScanExec( columnarBatch.setNumRows(rowCount) for (i <- 0 until cachedColumnarBatch.buffers.length) { + val index = if (columnIndices == null || columnIndices.length == 0) i + else columnIndices(i) ColumnAccessor.decompress( - cachedColumnarBatch.buffers(i), columnarBatch.column(i).asInstanceOf[WritableColumnVector], + cachedColumnarBatch.buffers(i), + columnarBatch.column(index).asInstanceOf[WritableColumnVector], schema.fields(i).dataType, rowCount) } return columnarBatch From a3646e3351d19a93583a7093898fa424358a5b3d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 16 Oct 2017 16:52:52 +0100 Subject: [PATCH 05/11] fix failures SPARK-6743, access only some column, SPARK-20356 --- .../execution/columnar/InMemoryTableScanExec.scala | 14 +++++++------- .../apache/spark/sql/DataFrameTungstenSuite.scala | 9 ++++----- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index ebf176d033249..f29916e5a1d41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} import org.apache.spark.sql.execution.vectorized._ -import org.apache.spark.sql.types.UserDefinedType +import org.apache.spark.sql.types.{StructType, UserDefinedType} case class InMemoryTableScanExec( @@ -46,18 +46,18 @@ case class InMemoryTableScanExec( private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { val rowCount = cachedColumnarBatch.numRows - val schema = cachedColumnarBatch.schema + val originalSchema = cachedColumnarBatch.schema.toArray + val schema = new StructType(columnIndices.map(i => originalSchema(i))) val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, schema) val columnarBatch = new ColumnarBatch( schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) columnarBatch.setNumRows(rowCount) - for (i <- 0 until cachedColumnarBatch.buffers.length) { - val index = if (columnIndices == null || columnIndices.length == 0) i - else columnIndices(i) + for (i <- 0 until attributes.length) { + val index = if (columnIndices.length == 0) i else columnIndices(i) ColumnAccessor.decompress( - cachedColumnarBatch.buffers(i), - columnarBatch.column(index).asInstanceOf[WritableColumnVector], + cachedColumnarBatch.buffers(index), + columnarBatch.column(i).asInstanceOf[WritableColumnVector], schema.fields(i).dataType, rowCount) } return columnarBatch diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala index 4483a62bd7a6a..0881212a64de8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala @@ -103,11 +103,10 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext { } } - test("some columns in table cache are not accessed") { - val df = sparkContext.parallelize( - Seq((1, 1.1), (2, 2.2), (3, 3.3)), 1).toDF("x", "y").cache + test("access only some column of the all of columns") { + val df = spark.range(1, 10).map(i => (i, (i + 1).toDouble)).toDF("l", "d") + df.cache df.count - val df1 = df.filter("y > 2.2") - checkAnswer(df1, Row(3, 3.3)) + assert(df.filter("d < 3").count == 1) } } From 34982c6f5839cddeadf2d8480fd768fe3c6f01e6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 17 Oct 2017 04:30:18 +0100 Subject: [PATCH 06/11] address review comments --- .../spark/sql/execution/ColumnarBatchScan.scala | 6 ++---- .../sql/execution/columnar/InMemoryRelation.scala | 7 ++----- .../execution/columnar/InMemoryTableScanExec.scala | 14 ++++++++------ 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 588e686d62d66..eb01e126bcbef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, WritableColumnVector} -import org.apache.spark.sql.types.{DataType, DataTypes} +import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types.DataType /** @@ -82,7 +81,6 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") - val cachedBatch = ctx.freshName("cachedBatch") val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 45e3a3e69c9ee..2760f3fc2f8b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -49,11 +49,9 @@ object InMemoryRelation { * @param numRows The total number of rows in this batch * @param buffers The buffers for serialized columns * @param stats The stat of columns - * @param schema The schema of columns */ private[columnar] -case class CachedBatch( - numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow, schema: StructType) +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) case class InMemoryRelation( output: Seq[Attribute], @@ -147,8 +145,7 @@ case class InMemoryRelation( columnBuilders.flatMap(_.columnStats.collectedStatistics)) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) - }, stats, - if (useColumnarBatch) StructType.fromAttributes(output) else null) + }, stats) } def hasNext: Boolean = rowIterator.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index f29916e5a1d41..6bb0174c69bc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -39,18 +39,20 @@ case class InMemoryTableScanExec( override def vectorTypes: Option[Seq[String]] = Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) + override val supportCodegen: Boolean = relation.useColumnarBatches + val columnIndices = attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray - override val supportCodegen: Boolean = relation.useColumnarBatches + val relationSchema = relation.schema.toArray + + val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { val rowCount = cachedColumnarBatch.numRows - val originalSchema = cachedColumnarBatch.schema.toArray - val schema = new StructType(columnIndices.map(i => originalSchema(i))) - val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, schema) + val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) val columnarBatch = new ColumnarBatch( - schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) + columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) columnarBatch.setNumRows(rowCount) for (i <- 0 until attributes.length) { @@ -58,7 +60,7 @@ case class InMemoryTableScanExec( ColumnAccessor.decompress( cachedColumnarBatch.buffers(index), columnarBatch.column(i).asInstanceOf[WritableColumnVector], - schema.fields(i).dataType, rowCount) + columnarBatchSchema.fields(i).dataType, rowCount) } return columnarBatch } From 45014d702339193a5f053c7cbeba55d59659229f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 17 Oct 2017 08:53:21 +0100 Subject: [PATCH 07/11] fix test failures in HiveCompatibilitySuite and HiveQuerySuite --- .../sql/execution/columnar/InMemoryTableScanExec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 6bb0174c69bc1..b5f2e683e327e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -41,12 +41,12 @@ case class InMemoryTableScanExec( override val supportCodegen: Boolean = relation.useColumnarBatches - val columnIndices = + private val columnIndices = attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray - val relationSchema = relation.schema.toArray + private val relationSchema = relation.schema.toArray - val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) + private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { val rowCount = cachedColumnarBatch.numRows From c356ebea343fc9285d12cd66f132430e8d9eaf43 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 17 Oct 2017 16:56:25 +0100 Subject: [PATCH 08/11] address review comments --- .../execution/columnar/InMemoryRelation.scala | 21 +-------------- .../columnar/InMemoryTableScanExec.scala | 26 ++++++++++++++----- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 2760f3fc2f8b9..a1c62a729900e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -26,8 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} -import org.apache.spark.sql.types._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator @@ -64,23 +63,6 @@ case class InMemoryRelation( val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { - /** - * If true, get data from ColumnVector in ColumnarBatch, which are generally faster. - * If false, get data from UnsafeRow build from ColumnVector - */ - private[columnar] val useColumnarBatches: Boolean = { - // In the initial implementation, for ease of review - // support only primitive data types and # of fields is less than wholeStageMaxNumFields - val schema = StructType.fromAttributes(child.output) - schema.fields.find(f => f.dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType => false - case _ => true - }).isEmpty && - !WholeStageCodegenExec.isTooManyFields(conf, child.schema) && - children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isEmpty - } - override protected def innerChildren: Seq[SparkPlan] = Seq(child) override def producedAttributes: AttributeSet = outputSet @@ -105,7 +87,6 @@ case class InMemoryRelation( private def buildBuffers(): Unit = { val output = child.output - val useColumnarBatch = useColumnarBatches val cached = child.execute().mapPartitionsInternal { rowIterator => new Iterator[CachedBatch] { def next(): CachedBatch = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index b5f2e683e327e..7637d5dc16783 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.vectorized._ -import org.apache.spark.sql.types.{StructType, UserDefinedType} +import org.apache.spark.sql.types._ case class InMemoryTableScanExec( @@ -39,7 +39,22 @@ case class InMemoryTableScanExec( override def vectorTypes: Option[Seq[String]] = Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) - override val supportCodegen: Boolean = relation.useColumnarBatches + /** + * If true, get data from ColumnVector in ColumnarBatch, which are generally faster. + * If false, get data from UnsafeRow build from ColumnVector + */ + override val supportCodegen: Boolean = { + // In the initial implementation, for ease of review + // support only primitive data types and # of fields is less than wholeStageMaxNumFields + val schema = StructType.fromAttributes(relation.output) + schema.fields.find(f => f.dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | + FloatType | DoubleType => false + case _ => true + }).isEmpty && + !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) && + children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isEmpty + } private val columnIndices = attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray @@ -56,13 +71,12 @@ case class InMemoryTableScanExec( columnarBatch.setNumRows(rowCount) for (i <- 0 until attributes.length) { - val index = if (columnIndices.length == 0) i else columnIndices(i) ColumnAccessor.decompress( - cachedColumnarBatch.buffers(index), + cachedColumnarBatch.buffers(columnIndices(i)), columnarBatch.column(i).asInstanceOf[WritableColumnVector], columnarBatchSchema.fields(i).dataType, rowCount) } - return columnarBatch + columnarBatch } override def inputRDDs(): Seq[RDD[InternalRow]] = { From fd8cdbdaeaeabc27e81ea6facec5e407e31e4984 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 19 Oct 2017 07:00:23 +0100 Subject: [PATCH 09/11] address review comment --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 7637d5dc16783..86d4ac5dd4207 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -51,9 +51,7 @@ case class InMemoryTableScanExec( case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => false case _ => true - }).isEmpty && - !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) && - children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isEmpty + }).isEmpty && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) } private val columnIndices = From a12d8dac2f094db660751151f1a0b4eed82af014 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 23 Oct 2017 19:34:35 +0100 Subject: [PATCH 10/11] address review comments --- .../columnar/InMemoryTableScanExec.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 86d4ac5dd4207..32f810d233597 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -46,8 +46,7 @@ case class InMemoryTableScanExec( override val supportCodegen: Boolean = { // In the initial implementation, for ease of review // support only primitive data types and # of fields is less than wholeStageMaxNumFields - val schema = StructType.fromAttributes(relation.output) - schema.fields.find(f => f.dataType match { + relation.schema.fields.find(f => f.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => false case _ => true @@ -78,14 +77,11 @@ case class InMemoryTableScanExec( } override def inputRDDs(): Seq[RDD[InternalRow]] = { - if (supportCodegen) { - val buffers = relation.cachedColumnBuffers - // HACK ALERT: This is actually an RDD[ColumnarBatch]. - // We're taking advantage of Scala's type erasure here to pass these batches along. - Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]]) - } else { - Seq() - } + assert(supportCodegen) + val buffers = relation.cachedColumnBuffers + // HACK ALERT: This is actually an RDD[ColumnarBatch]. + // We're taking advantage of Scala's type erasure here to pass these batches along. + Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]]) } override def output: Seq[Attribute] = attributes From db61b41a61da5d484742ee8f0dfa53e1486b0456 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 24 Oct 2017 04:57:14 +0100 Subject: [PATCH 11/11] address review comment --- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../sql/execution/columnar/InMemoryTableScanExec.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index de600c6e085dd..e37d133ff336a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -509,7 +509,7 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { val hasTooManyOutputFields = WholeStageCodegenExec.isTooManyFields(conf, plan.schema) val hasTooManyInputFields = - plan.children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isDefined + plan.children.exists(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)) !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 32f810d233597..3307c091a7a86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -46,11 +46,11 @@ case class InMemoryTableScanExec( override val supportCodegen: Boolean = { // In the initial implementation, for ease of review // support only primitive data types and # of fields is less than wholeStageMaxNumFields - relation.schema.fields.find(f => f.dataType match { + relation.schema.fields.forall(f => f.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType => false - case _ => true - }).isEmpty && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) + FloatType | DoubleType => true + case _ => false + }) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) } private val columnIndices =