diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala index 7def44bd2a2b..7576faa99c96 100644 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala @@ -69,12 +69,17 @@ private[spark] class Benchmark( * @param name of the benchmark case * @param numIters if non-zero, forces exactly this many iterations to be run */ - def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { - addTimerCase(name, numIters) { timer => + def addCase( + name: String, + numIters: Int = 0, + prepare: () => Unit = () => { }, + cleanup: () => Unit = () => { })(f: Int => Unit): Unit = { + val timedF = (timer: Benchmark.Timer) => { timer.startTiming() f(timer.iteration) timer.stopTiming() } + benchmarks += Benchmark.Case(name, timedF, numIters, prepare, cleanup) } /** @@ -101,7 +106,12 @@ private[spark] class Benchmark( val results = benchmarks.map { c => println(" Running case: " + c.name) - measure(valuesPerIteration, c.numIters)(c.fn) + try { + c.prepare() + measure(valuesPerIteration, c.numIters)(c.fn) + } finally { + c.cleanup() + } } println @@ -188,7 +198,12 @@ private[spark] object Benchmark { } } - case class Case(name: String, fn: Timer => Unit, numIters: Int) + case class Case( + name: String, + fn: Timer => Unit, + numIters: Int, + prepare: () => Unit = () => { }, + cleanup: () => Unit = () => { }) case class Result(avgMs: Double, bestRate: Double, bestMs: Double) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala new file mode 100644 index 000000000000..a3fc55cf44c1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types.DataType + + +/** + * Helper trait for abstracting scan functionality using + * [[org.apache.spark.sql.execution.vectorized.ColumnarBatch]]es. + */ +private[sql] trait ColumnarBatchScan extends CodegenSupport { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + + /** + * Generate [[ColumnVector]] expressions for our parent to consume as rows. + * This is called once per [[ColumnarBatch]]. + */ + private def genCodeColumnVector( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val javaType = ctx.javaType(dataType) + val value = ctx.getValue(columnVar, dataType, ordinal) + val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" } + val valueVar = ctx.freshName("value") + val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" + val code = s"${ctx.registerComment(str)}\n" + (if (nullable) { + s""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $javaType $valueVar = $isNullVar ? ${ctx.defaultValue(dataType)} : ($value); + """ + } else { + s"$javaType $valueVar = $value;" + }).trim + ExprCode(code, isNullVar, valueVar) + } + + /** + * Produce code to process the input iterator as [[ColumnarBatch]]es. + * This produces an [[UnsafeRow]] for each row in each batch. + */ + // TODO: return ColumnarBatch.Rows instead + override protected def doProduce(ctx: CodegenContext): String = { + val input = ctx.freshName("input") + // PhysicalRDD always just has one input + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val batch = ctx.freshName("batch") + ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") + + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" + val idx = ctx.freshName("batchIdx") + ctx.addMutableState("int", idx, s"$idx = 0;") + val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) + val columnAssigns = colVars.zipWithIndex.map { case (name, i) => + ctx.addMutableState(columnVectorClz, name, s"$name = null;") + s"$name = $batch.column($i);" + } + + val nextBatch = ctx.freshName("nextBatch") + ctx.addNewFunction(nextBatch, + s""" + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) + + ctx.currentVars = null + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) + } + s""" + |if ($batch == null) { + | $nextBatch(); + |} + |while ($batch != null) { + | int numRows = $batch.numRows(); + | while ($idx < numRows) { + | int $rowidx = $idx++; + | ${consume(ctx, columnsBatchInput).trim} + | if (shouldStop()) return; + | } + | $batch = null; + | $nextBatch(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; + """.stripMargin + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index e2c23a4ba867..8489179521e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution import org.apache.commons.lang.StringUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.HadoopFsRelation @@ -229,11 +229,7 @@ private[sql] case class BatchedDataSourceScanExec( override val outputPartitioning: Partitioning, override val metadata: Map[String, String], override val metastoreTableIdentifier: Option[TableIdentifier]) - extends DataSourceScanExec with CodegenSupport { - - private[sql] override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + extends DataSourceScanExec with ColumnarBatchScan { protected override def doExecute(): RDD[InternalRow] = { // in the case of fallback, this batched scan should never fail because of: @@ -253,88 +249,6 @@ private[sql] case class BatchedDataSourceScanExec( override def inputRDDs(): Seq[RDD[InternalRow]] = { rdd :: Nil } - - private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, - dataType: DataType, nullable: Boolean): ExprCode = { - val javaType = ctx.javaType(dataType) - val value = ctx.getValue(columnVar, dataType, ordinal) - val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" } - val valueVar = ctx.freshName("value") - val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" - val code = s"${ctx.registerComment(str)}\n" + (if (nullable) { - s""" - boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal); - $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value); - """ - } else { - s"$javaType ${valueVar} = $value;" - }).trim - ExprCode(code, isNullVar, valueVar) - } - - // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen - // never requires UnsafeRow as input. - override protected def doProduce(ctx: CodegenContext): String = { - val input = ctx.freshName("input") - // PhysicalRDD always just has one input - ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") - - // metrics - val numOutputRows = metricTerm(ctx, "numOutputRows") - val scanTimeMetric = metricTerm(ctx, "scanTime") - val scanTimeTotalNs = ctx.freshName("scanTime") - ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") - - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" - val batch = ctx.freshName("batch") - ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") - - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" - val idx = ctx.freshName("batchIdx") - ctx.addMutableState("int", idx, s"$idx = 0;") - val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) - val columnAssigns = colVars.zipWithIndex.map { case (name, i) => - ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = $batch.column($i);" - } - - val nextBatch = ctx.freshName("nextBatch") - ctx.addNewFunction(nextBatch, - s""" - |private void $nextBatch() throws java.io.IOException { - | long getBatchStart = System.nanoTime(); - | if ($input.hasNext()) { - | $batch = ($columnarBatchClz)$input.next(); - | $numOutputRows.add($batch.numRows()); - | $idx = 0; - | ${columnAssigns.mkString("", "\n", "\n")} - | } - | $scanTimeTotalNs += System.nanoTime() - getBatchStart; - |}""".stripMargin) - - ctx.currentVars = null - val rowidx = ctx.freshName("rowIdx") - val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) - } - s""" - |if ($batch == null) { - | $nextBatch(); - |} - |while ($batch != null) { - | int numRows = $batch.numRows(); - | while ($idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columnsBatchInput).trim} - | if (shouldStop()) return; - | } - | $batch = null; - | $nextBatch(); - |} - |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - |$scanTimeTotalNs = 0; - """.stripMargin - } } private[sql] object DataSourceScanExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7a14879b8b9d..e228bbc257b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -203,7 +203,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera return false; } - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); + ${classOf[CachedBatchBytes].getName} batch = + (${classOf[CachedBatchBytes].getName}) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); for (int i = 0; i < columnIndexes.length; i ++) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnarBatch.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnarBatch.scala new file mode 100644 index 000000000000..1becca162fca --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnarBatch.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter} +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator +import org.apache.spark.sql.execution.vectorized.ColumnarBatch +import org.apache.spark.sql.types._ + + +/** + * A helper class to expose the scala iterator to Java. + */ +abstract class ColumnarBatchIterator extends Iterator[ColumnarBatch] + + +/** + * Generate code to batch [[InternalRow]]s into [[ColumnarBatch]]es. + */ +class GenerateColumnarBatch( + schema: StructType, + batchSize: Int) + extends CodeGenerator[Iterator[InternalRow], Iterator[ColumnarBatch]] { + + protected def canonicalize(in: Iterator[InternalRow]): Iterator[InternalRow] = in + + protected def bind( + in: Iterator[InternalRow], + inputSchema: Seq[Attribute]): Iterator[InternalRow] = { + in + } + + protected def create(rowIterator: Iterator[InternalRow]): Iterator[ColumnarBatch] = { + import scala.collection.JavaConverters._ + val ctx = newCodeGenContext() + val batchVar = ctx.freshName("columnarBatch") + val rowNumVar = ctx.freshName("rowNum") + val numBytesVar = ctx.freshName("bytesInBatch") + val rowIterVar = ctx.addReferenceObj( + "rowIterator", rowIterator.asJava, classOf[java.util.Iterator[_]].getName) + val schemaVar = ctx.addReferenceObj("schema", schema, classOf[StructType].getName) + val maxNumBytes = ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE + // Code to populate column vectors with the values of the input rows + val populateColumnVectorsCode = schema.fields.zipWithIndex.map { case (field, i) => + val typeName = GenerateColumnarBatch.typeToName(field.dataType) + val put = "put" + typeName.capitalize + val get = "get" + typeName.capitalize + s""" + $batchVar.column($i).$put($rowNumVar, row.$get($i)); + $numBytesVar += ${field.dataType.defaultSize}; + """.trim + }.mkString("\n") + val code = s""" + import org.apache.spark.memory.MemoryMode; + import org.apache.spark.sql.catalyst.InternalRow; + import org.apache.spark.sql.execution.vectorized.ColumnarBatch; + + public GeneratedColumnarBatchIterator generate(Object[] references) { + return new GeneratedColumnarBatchIterator(references); + } + + class GeneratedColumnarBatchIterator extends ${classOf[ColumnarBatchIterator].getName} { + ${ctx.declareMutableStates()} + + public GeneratedColumnarBatchIterator(Object[] references) { + ${ctx.initMutableStates()} + } + + @Override + public boolean hasNext() { + return $rowIterVar.hasNext(); + } + + @Override + public ColumnarBatch next() { + ColumnarBatch $batchVar = + ColumnarBatch.allocate($schemaVar, MemoryMode.ON_HEAP, $batchSize); + int $rowNumVar = 0; + long $numBytesVar = 0; + while ($rowIterVar.hasNext() && $rowNumVar < $batchSize && $numBytesVar < $maxNumBytes) { + InternalRow row = (InternalRow) $rowIterVar.next(); + $populateColumnVectorsCode + $rowNumVar += 1; + } + $batchVar.setNumRows($rowNumVar); + return $batchVar; + } + } + """ + val formattedCode = CodeFormatter.stripOverlappingComments( + new CodeAndComment(code, ctx.getPlaceHolderToComments())) + CodeGenerator.compile(formattedCode).generate(ctx.references.toArray) + .asInstanceOf[Iterator[ColumnarBatch]] + } + +} + + +private[columnar] object GenerateColumnarBatch { + + private val typeToName = Map[DataType, String]( + BooleanType -> "boolean", + ByteType -> "byte", + ShortType -> "short", + IntegerType -> "int", + LongType -> "long", + FloatType -> "float", + DoubleType -> "double") + + /** + * Whether [[ColumnarBatch]]-based caching is supported for the given data type + */ + def isSupported(dataType: DataType): Boolean = { + typeToName.contains(dataType) + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 02866c76cb7a..2b54f6a0262d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -30,10 +30,40 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.vectorized.ColumnarBatch +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType import org.apache.spark.storage.StorageLevel import org.apache.spark.util.CollectionAccumulator +/** + * An abstract representation of a cached batch of rows. + */ +private[columnar] trait CachedBatch + + +/** + * A cached batch of rows stored as a list of byte arrays, one for each column. + * + * @param numRows The total number of rows in this batch + * @param buffers The serialized buffers for serialized columns + * @param stats The stat of columns + */ +private[columnar] case class CachedBatchBytes( + numRows: Int, + buffers: Array[Array[Byte]], + stats: InternalRow) + extends CachedBatch + + +/** + * A cached batch of rows stored as a [[ColumnarBatch]]. + */ +private[columnar] case class CachedColumnarBatch(columnarBatch: ColumnarBatch) + extends CachedBatch + + private[sql] object InMemoryRelation { def apply( useCompression: Boolean, @@ -46,15 +76,11 @@ private[sql] object InMemoryRelation { /** - * CachedBatch is a cached batch of rows. + * Container for a physical plan that should be cached in memory. * - * @param numRows The total number of rows in this batch - * @param buffers The buffers for serialized columns - * @param stats The stat of columns + * This batches the rows from that plan into [[CachedBatch]]es that are later consumed by + * [[InMemoryTableScanExec]]. */ -private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) - private[sql] case class InMemoryRelation( output: Seq[Attribute], useCompression: Boolean, @@ -67,6 +93,17 @@ private[sql] case class InMemoryRelation( child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) extends logical.LeafNode with MultiInstanceRelation { + /** + * If true, store the input rows using [[CachedColumnarBatch]]es, which are generally faster. + * If false, store the input rows using [[CachedBatchBytes]]. + */ + private[columnar] val useColumnarBatches: Boolean = { + val enabled = child.sqlContext.conf.getConf(SQLConf.CACHE_CODEGEN) + // Fallback to storing the rows as bytes if the schema has non-primitive types + val supported = output.forall { a => GenerateColumnarBatch.isSupported(a.dataType) } + enabled && supported + } + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) override def producedAttributes: AttributeSet = outputSet @@ -92,23 +129,33 @@ private[sql] case class InMemoryRelation( } } - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { - buildBuffers() - } - - def recache(): Unit = { - _cachedColumnBuffers.unpersist() - _cachedColumnBuffers = null - buildBuffers() + /** + * Batch the input rows into [[CachedBatch]]es. + */ + private def buildColumnBuffers(): RDD[CachedBatch] = { + val buffers = + if (useColumnarBatches) { + buildColumnarBatches() + } else { + buildColumnBytes() + } + buffers.setName( + tableName.map { n => s"In-memory table $n" } + .getOrElse(StringUtils.abbreviate(child.toString, 1024))) + buffers.asInstanceOf[RDD[CachedBatch]] } - private def buildBuffers(): Unit = { + /** + * Batch the input rows into [[CachedBatchBytes]] built using [[ColumnBuilder]]s. + * + * This handles complex types and compression, but is more expensive than + * [[buildColumnarBatches]], which generates code to build the buffers. + */ + private def buildColumnBytes(): RDD[CachedBatchBytes] = { val output = child.output - val cached = child.execute().mapPartitionsInternal { rowIterator => - new Iterator[CachedBatch] { - def next(): CachedBatch = { + child.execute().mapPartitionsInternal { rowIterator => + new Iterator[CachedBatchBytes] { + def next(): CachedBatchBytes = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray @@ -143,7 +190,7 @@ private[sql] case class InMemoryRelation( .flatMap(_.values)) batchStats.add(stats) - CachedBatch(rowCount, columnBuilders.map { builder => + CachedBatchBytes(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) } @@ -151,11 +198,30 @@ private[sql] case class InMemoryRelation( def hasNext: Boolean = rowIterator.hasNext } }.persist(storageLevel) + } - cached.setName( - tableName.map(n => s"In-memory table $n") - .getOrElse(StringUtils.abbreviate(child.toString, 1024))) - _cachedColumnBuffers = cached + /** + * Batch the input rows using [[ColumnarBatch]]es. + * + * Compared with [[buildColumnBytes]], this provides a faster implementation of memory + * scan because both the read path and the write path are generated. This only supports + * basic primitive types and does not compress data, however. + */ + private def buildColumnarBatches(): RDD[CachedColumnarBatch] = { + val schema = StructType.fromAttributes(child.output) + child.execute().mapPartitionsInternal { rows => + new GenerateColumnarBatch(schema, batchSize).generate(rows).map { b => + CachedColumnarBatch(b) + } + }.persist(storageLevel) + } + + def recache(): Unit = { + if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.unpersist() + _cachedColumnBuffers = null + } + _cachedColumnBuffers = buildColumnBuffers() } def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { @@ -176,7 +242,15 @@ private[sql] case class InMemoryRelation( batchStats).asInstanceOf[this.type] } - def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers + /** + * Return lazily cached batches of rows in the original plan. + */ + def cachedColumnBuffers: RDD[CachedBatch] = { + if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildColumnBuffers() + } + _cachedColumnBuffers + } override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, batchStats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 2695f356cd3e..3952b14a331c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} import org.apache.spark.sql.types.UserDefinedType @@ -32,13 +31,25 @@ private[sql] case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) - extends LeafExecNode { + extends LeafExecNode with ColumnarBatchScan { + + override val supportCodegen: Boolean = relation.useColumnarBatches + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + if (relation.useColumnarBatches) { + // HACK ALERT: This is actually an RDD[CachedColumnarBatch]. + // We're taking advantage of Scala's type erasure here to pass these batches along. + Seq(relation.cachedColumnBuffers + .asInstanceOf[RDD[CachedColumnarBatch]] + .map(_.columnarBatch) + .asInstanceOf[RDD[InternalRow]]) + } else { + Seq() + } + } override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - override def output: Seq[Attribute] = attributes // The cached version does not change the outputPartitioning of the original SparkPlan. @@ -109,6 +120,8 @@ private[sql] case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning protected override def doExecute(): RDD[InternalRow] = { + assert(!relation.useColumnarBatches) + assert(relation.cachedColumnBuffers != null) val numOutputRows = longMetric("numOutputRows") if (enableAccumulators) { @@ -121,7 +134,7 @@ private[sql] case class InMemoryTableScanExec( val schema = relation.partitionStatistics.schema val schemaIndex = schema.zipWithIndex val relOutput: AttributeSeq = relation.output - val buffers = relation.cachedColumnBuffers + val buffers = relation.cachedColumnBuffers.asInstanceOf[RDD[CachedBatchBytes]] buffers.mapPartitionsInternal { cachedBatchIterator => val partitionFilter = newPredicate( @@ -175,4 +188,5 @@ private[sql] case class InMemoryTableScanExec( columnarIterator } } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1a9bb6a0b54e..4732f4f4d10f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -91,6 +91,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_CODEGEN = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.codegen") + .internal() + .doc("When true, use generated code to build column batches for caching. This is only " + + "supported for basic types and improves caching performance for such types.") + .booleanConf + .createWithDefault(true) + val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CacheBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CacheBenchmark.scala new file mode 100644 index 000000000000..b904d09a4cc0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CacheBenchmark.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Benchmark + + +class CacheBenchmark extends BenchmarkBase { + + test("cache with randomized keys - end-to-end") { + benchmarkRandomizedKeys(size = 20 << 18, readPathOnly = false) + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + Cache random keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ---------------------------------------------------------------------------------------------- + cache = F 641 / 667 8.2 122.2 1.0X + cache = T columnar_batches = F compress = F 1696 / 1833 3.1 323.6 0.4X + cache = T columnar_batches = F compress = T 7517 / 7748 0.7 1433.8 0.1X + cache = T columnar_batches = T 1023 / 1102 5.1 195.0 0.6X + */ + } + + test("cache with randomized keys - read path only") { + benchmarkRandomizedKeys(size = 20 << 21, readPathOnly = true) + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + Cache random keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ----------------------------------------------------------------------------------------------- + cache = F 890 / 920 47.1 21.2 1.0X + cache = T columnar_batches = F compress = F 1950 / 1978 21.5 46.5 0.5X + cache = T columnar_batches = F compress = T 1893 / 1927 22.2 45.1 0.5X + cache = T columnar_batches = T 540 / 544 77.7 12.9 1.6X + */ + } + + /** + * Call collect on a [[DataFrame]] after deleting all existing temporary files. + * This also checks whether the collected result matches the expected answer. + */ + private def collect(df: DataFrame, expectedAnswer: Seq[Row]): Unit = { + df.sparkSession.sparkContext.parallelize(1 to 10, 10).foreach { _ => + SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach { dir => + dir.delete() + } + } + QueryTest.checkAnswer(df, expectedAnswer) match { + case Some(errMessage) => throw new RuntimeException(errMessage) + case None => // all good + } + } + + /** + * Benchmark caching randomized keys created from a range. + * + * NOTE: When running this benchmark, you will get a lot of WARN logs complaining that the + * shuffle files do not exist. This is intentional; we delete the shuffle files manually + * after every call to `collect` to avoid the next run to reuse shuffle files written by + * the previous run. + */ + private def benchmarkRandomizedKeys(size: Int, readPathOnly: Boolean): Unit = { + val numIters = 10 + val benchmark = new Benchmark("Cache random keys", size) + sparkSession.range(size) + .selectExpr("id", "floor(rand() * 10000) as k") + .createOrReplaceTempView("test") + val query = "select count(k), count(id) from test" + val expectedAnswer = sparkSession.sql(query).collect().toSeq + + /** + * Add a benchmark case, optionally specifying whether to cache the dataset. + */ + def addBenchmark(name: String, cache: Boolean, params: Map[String, String] = Map()): Unit = { + val ds = sparkSession.sql(query) + val defaults = params.keys.flatMap { k => sparkSession.conf.getOption(k).map((k, _)) } + def prepare(): Unit = { + params.foreach { case (k, v) => sparkSession.conf.set(k, v) } + if (cache) { sparkSession.catalog.cacheTable("test") } + if (readPathOnly) { + collect(ds, expectedAnswer) + } + } + def cleanup(): Unit = { + defaults.foreach { case (k, v) => sparkSession.conf.set(k, v) } + sparkSession.catalog.clearCache() + } + benchmark.addCase(name, numIters, prepare, cleanup) { _ => + if (readPathOnly) { + collect(ds, expectedAnswer) + } else { + // also benchmark the time it takes to build the column buffers + val ds2 = sparkSession.sql(query) + collect(ds2, expectedAnswer) + collect(ds2, expectedAnswer) + } + } + } + + // All of these are codegen = T hashmap = T + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.VECTORIZED_AGG_MAP_MAX_COLUMNS.key, "1024") + + // Benchmark cases: + // (1) No caching + // (2) Caching without compression + // (3) Caching with compression + // (4) Caching with column batches (without compression) + addBenchmark("cache = F", cache = false) + addBenchmark("cache = T columnar_batches = F compress = F", cache = true, Map( + SQLConf.CACHE_CODEGEN.key -> "false", + SQLConf.COMPRESS_CACHED.key -> "false" + )) + addBenchmark("cache = T columnar_batches = F compress = T", cache = true, Map( + SQLConf.CACHE_CODEGEN.key -> "false", + SQLConf.COMPRESS_CACHED.key -> "true" + )) + addBenchmark("cache = T columnar_batches = T", cache = true, Map( + SQLConf.CACHE_CODEGEN.key -> "true" + )) + benchmark.run() + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index af3ed14c122d..d296b1ef2b8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -import org.apache.spark.storage.StorageLevel.MEMORY_ONLY +import org.apache.spark.storage.StorageLevel.{MEMORY_ONLY, NONE} class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -231,4 +233,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val columnTypes2 = List.fill(length2)(IntegerType) val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) } + + test("InMemoryRelation builds the correct buffers") { + testColumnBatches(useColumnBatches = true, useComplexSchema = false) + testColumnBatches(useColumnBatches = false, useComplexSchema = false) + } + + test("InMemoryRelation falls back on non-codegen path with complex schemas") { + testColumnBatches(useColumnBatches = true, useComplexSchema = true) + testColumnBatches(useColumnBatches = false, useComplexSchema = true) + } + + private def testColumnBatches(useColumnBatches: Boolean, useComplexSchema: Boolean = false) { + withSQLConf(SQLConf.CACHE_CODEGEN.key -> useColumnBatches.toString) { + val logicalPlan = org.apache.spark.sql.catalyst.plans.logical.Range(1, 10, 1, 10) + val sparkPlan = new org.apache.spark.sql.execution.RangeExec(logicalPlan) { + override val output: Seq[Attribute] = { + if (useComplexSchema) { + Seq(AttributeReference("complex", ArrayType(LongType))()) + } else { + logicalPlan.output + } + } + } + val inMemoryRelation = InMemoryRelation( + useCompression = false, + batchSize = 100, + storageLevel = MEMORY_ONLY, + child = sparkPlan, + tableName = None) + assert(inMemoryRelation.useColumnarBatches == useColumnBatches && !useComplexSchema) + assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == MEMORY_ONLY) + inMemoryRelation.cachedColumnBuffers.collect().head match { + case _: CachedColumnarBatch => assert(useColumnBatches && !useComplexSchema) + case _: CachedBatchBytes => assert(!useColumnBatches || useComplexSchema) + case other => fail(s"Unexpected cached batch type: ${other.getClass.getName}") + } + } + } + }