diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 7d16118c9d59..00d4a26c846a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -100,3 +100,28 @@ object BindReferences extends Logging { }.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible. } } + +/** + * A column vector reference points to a specific column for ColumnVector. + * columnVar is a variable that keeps ColumnVector, and ordinal is row index in ColumnVector + */ +case class ColumnVectorReference( + columnVar: String, ordinal: String, dataType: DataType, nullable: Boolean) + extends LeafExpression { + + override def toString: String = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" + + override def eval(input: InternalRow): Any = null + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val javaType = ctx.javaType(dataType) + val value = ctx.getValue(columnVar, dataType, ordinal) + if (nullable) { + ev.copy(code = s""" + boolean ${ev.isNull} = ${columnVar}.isNullAt($ordinal); + $javaType ${ev.value} = ${ev.isNull} ? ${ctx.defaultValue(dataType)} : ($value);""") + } else { + ev.copy(code = s"""$javaType ${ev.value} = $value;""", isNull = "false") + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 6392ff42d709..e56e2922074e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -123,11 +123,12 @@ class CodegenContext { * * They will be kept as member variables in generated classes like `SpecificProjection`. */ - val mutableStates: mutable.ArrayBuffer[(String, String, String)] = - mutable.ArrayBuffer.empty[(String, String, String)] + val mutableStates: mutable.ArrayBuffer[(String, String, String, String)] = + mutable.ArrayBuffer.empty[(String, String, String, String)] - def addMutableState(javaType: String, variableName: String, initCode: String): Unit = { - mutableStates += ((javaType, variableName, initCode)) + def addMutableState(javaType: String, variableName: String, initCode: String, + cleanupCode: String = ""): Unit = { + mutableStates += ((javaType, variableName, initCode, cleanupCode)) } /** @@ -149,7 +150,7 @@ class CodegenContext { def declareMutableStates(): String = { // It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in // `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones. - mutableStates.distinct.map { case (javaType, variableName, _) => + mutableStates.distinct.map { case (javaType, variableName, _, _) => s"private $javaType $variableName;" }.mkString("\n") } @@ -160,6 +161,9 @@ class CodegenContext { mutableStates.distinct.map(_._3).mkString("\n") } + def cleanupMutableStates(): String = { + mutableStates.map(_._4).mkString("\n") + } /** * Holding all the functions those will be added into generated class. */ @@ -206,6 +210,10 @@ class CodegenContext { /** The variable name of the input row in generated code. */ final var INPUT_ROW = "i" + var isRow = true + var enableColumnCodeGen = false + var iteratorInput = "" + /** * The map from a variable name to it's next ID. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ByteBufferColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ByteBufferColumnVector.java new file mode 100644 index 000000000000..0392384007c6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ByteBufferColumnVector.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow; +import org.apache.spark.sql.catalyst.expressions.MutableRow; +import org.apache.spark.sql.execution.columnar.BasicColumnAccessor; +import org.apache.spark.sql.execution.columnar.ByteBufferHelper; +import org.apache.spark.sql.execution.columnar.NativeColumnAccessor; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.Platform; + +/** + * A column backed by an in memory JVM array. This stores the NULLs as a byte per value + * and a java array for the values. + */ +public final class ByteBufferColumnVector extends ColumnVector { + // The data stored in these arrays need to maintain binary compatible. We can + // directly pass this buffer to external components. + + // This is faster than a boolean array and we optimize this over memory footprint. + private byte[] nulls; + + // Array for each type. Only 1 is populated for any type. + private byte[] data; + private long offset; + + protected ByteBufferColumnVector(int capacity, DataType type, + boolean isConstant, ByteBuffer buffer, ByteBuffer nullsBuffer) { + super(capacity, type, MemoryMode.ON_HEAP); + if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { + throw new NotImplementedException(); + } else if ((type instanceof FloatType) || (type instanceof DoubleType)) { + data = buffer.array(); + offset = Platform.BYTE_ARRAY_OFFSET + buffer.position(); + } else if (resultStruct != null) { + // Nothing to store. + } else { + throw new RuntimeException("Unhandled " + type); + } + nulls = new byte[capacity]; + reset(); + + int numNulls = ByteBufferHelper.getInt(nullsBuffer); + for (int i = 0; i < numNulls; i++) { + int cordinal = ByteBufferHelper.getInt(nullsBuffer); + putNull(cordinal); + } + if (isConstant) { + setIsConstant(); + } + } + + @Override + public long valuesNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { + nulls[rowId] = (byte)0; + } + + @Override + public void putNull(int rowId) { + nulls[rowId] = (byte)1; + ++numNulls; + anyNullsSet = true; + } + + @Override + public void putNulls(int rowId, int count) { + for (int i = 0; i < count; ++i) { + nulls[rowId + i] = (byte)1; + } + anyNullsSet = true; + numNulls += count; + } + + @Override + public void putNotNulls(int rowId, int count) { + if (!anyNullsSet) return; + for (int i = 0; i < count; ++i) { + nulls[rowId + i] = (byte)0; + } + } + + @Override + public boolean isNullAt(int rowId) { + return nulls[rowId] == 1; + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { + throw new NotImplementedException(); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { + throw new NotImplementedException(); + } + + @Override + public boolean getBoolean(int rowId) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Bytes + // + + @Override + public void putByte(int rowId, byte value) { + throw new NotImplementedException(); + } + + @Override + public void putBytes(int rowId, int count, byte value) { + throw new NotImplementedException(); + } + + @Override + public void putBytes(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public byte getByte(int rowId) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Shorts + // + + @Override + public void putShort(int rowId, short value) { + throw new NotImplementedException(); + } + + @Override + public void putShorts(int rowId, int count, short value) { + throw new NotImplementedException(); + } + + @Override + public void putShorts(int rowId, int count, short[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public short getShort(int rowId) { + throw new NotImplementedException(); + } + + + // + // APIs dealing with Ints + // + + @Override + public void putInt(int rowId, int value) { + throw new NotImplementedException(); + } + + @Override + public void putInts(int rowId, int count, int value) { + throw new NotImplementedException(); + } + + @Override + public void putInts(int rowId, int count, int[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public int getInt(int rowId) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Longs + // + + @Override + public void putLong(int rowId, long value) { + throw new NotImplementedException(); + } + + @Override + public void putLongs(int rowId, int count, long value) { + throw new NotImplementedException(); + } + + @Override + public void putLongs(int rowId, int count, long[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public long getLong(int rowId) { + throw new NotImplementedException(); + } + + // + // APIs dealing with floats + // + + @Override + public void putFloat(int rowId, float value) { + throw new NotImplementedException(); + } + + @Override + public void putFloats(int rowId, int count, float value) { + throw new NotImplementedException(); + } + + @Override + public void putFloats(int rowId, int count, float[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putFloats(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public float getFloat(int rowId) { + assert(dictionary == null); + return Platform.getFloat(data, offset + rowId * 4); + } + + // + // APIs dealing with doubles + // + + @Override + public void putDouble(int rowId, double value) { + throw new NotImplementedException(); + } + + @Override + public void putDoubles(int rowId, int count, double value) { + throw new NotImplementedException(); + } + + @Override + public void putDoubles(int rowId, int count, double[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public double getDouble(int rowId) { + assert(dictionary == null); + return Platform.getDouble(data, offset + rowId * 8); + } + + // + // APIs dealing with Arrays + // + + @Override + public int getArrayLength(int rowId) { throw new NotImplementedException(); } + @Override + public int getArrayOffset(int rowId) { throw new NotImplementedException(); } + + @Override + public void putArray(int rowId, int offset, int length) { + throw new NotImplementedException(); + } + + @Override + public void loadBytes(ColumnVector.Array array) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Byte Arrays + // + + @Override + public int putByteArray(int rowId, byte[] value, int offset, int length) { + throw new NotImplementedException(); + } + + @Override + public void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); + } + + // Spilt this function out since it is the slow path. + @Override + protected void reserveInternal(int newCapacity) { + throw new NotImplementedException(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index bbbb796aca0d..1ea34673dc4f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -18,6 +18,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import com.google.common.annotations.VisibleForTesting; import org.apache.parquet.column.Dictionary; @@ -71,6 +72,11 @@ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode } } + public static ColumnVector allocate(int capacity, DataType type, + boolean isConstant, ByteBuffer buffer, ByteBuffer nullsBuffer) { + return new ByteBufferColumnVector(capacity, type, isConstant, buffer, nullsBuffer); + } + /** * Holder object to return an array. This object is intended to be reused. Callers should * copy the data out if it needs to be stored. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 6db7f45cfdf2..b414b03724e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -122,6 +122,8 @@ case class SortExec( // Name of sorter variable used in codegen. private var sorterVariable: String = _ + override def useUnsafeRow: Boolean = true + override protected def doProduce(ctx: CodegenContext): String = { val needToSort = ctx.freshName("needToSort") ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index ac4c3aae5f8e..a7dc93f7221a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -65,6 +66,11 @@ trait CodegenSupport extends SparkPlan { */ protected var parent: CodegenSupport = null + /** + * Whether this SparkPlan uses UnsafeRow as input in doProduce or doConsume + */ + def useUnsafeRow: Boolean = false + /** * Returns all the RDDs of InternalRow which generates the input rows. * @@ -234,10 +240,14 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp } override def doProduce(ctx: CodegenContext): String = { + ctx.enableColumnCodeGen = true val input = ctx.freshName("input") + ctx.iteratorInput = input // Right now, InputAdapter is only used when there is one input RDD. ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") - val row = ctx.freshName("row") + + if (ctx.isRow) { + val row = ctx.freshName("row") s""" | while ($input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); @@ -245,6 +255,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp | if (shouldStop()) return; | } """.stripMargin + } else { + InMemoryTableScanExec.produceColumnLoop(ctx, this, output) + } } override def generateTreeString( @@ -299,6 +312,8 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co "pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext, WholeStageCodegenExec.PIPELINE_DURATION_METRIC)) + var enableColumnCodeGen: Boolean = false + /** * Generates code for this subtree. * @@ -306,7 +321,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co */ def doCodeGen(): (CodegenContext, CodeAndComment) = { val ctx = new CodegenContext - val code = child.asInstanceOf[CodegenSupport].produce(ctx, this) + ctx.isRow = true + val codeRow = child.asInstanceOf[CodegenSupport].produce(ctx, this) + + enableColumnCodeGen = InMemoryTableScanExec.enableColumnCodeGen(sqlContext, ctx, child) + val codeProcessNext = if (!enableColumnCodeGen) { + s""" + protected void processNext() throws java.io.IOException { + ${codeRow.trim} + } + """ + } else { + InMemoryTableScanExec.produceProcessNext(ctx, this, child, codeRow) + } + val source = s""" public Object generate(Object[] references) { return new GeneratedIterator(references); @@ -329,9 +357,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co ${ctx.declareAddedFunctions()} - protected void processNext() throws java.io.IOException { - ${code.trim} - } + ${codeProcessNext} } """.trim diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 185c79f899e6..5ed78aa4a996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -382,6 +382,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) """.stripMargin) val input = ctx.freshName("input") + ctx.iteratorInput = input // Right now, Range is only used when there is one upstream. ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala index 7cde04b62619..08c2b3913c2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -61,6 +61,9 @@ private[columnar] abstract class BasicColumnAccessor[JvmType]( } protected def underlyingBuffer = buffer + + def getByteBuffer: ByteBuffer = + buffer.duplicate.order(ByteOrder.nativeOrder()) } private[columnar] class NullColumnAccessor(buffer: ByteBuffer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index f9d606e37ea8..2d47e84f9cf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -43,6 +43,12 @@ import org.apache.spark.unsafe.types.UTF8String * WARNING: This only works with HeapByteBuffer */ private[columnar] object ByteBufferHelper { + def getShort(buffer: ByteBuffer): Short = { + val pos = buffer.position() + buffer.position(pos + 2) + Platform.getShort(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) + } + def getInt(buffer: ByteBuffer): Int = { val pos = buffer.position() buffer.position(pos + 4) @@ -66,6 +72,15 @@ private[columnar] object ByteBufferHelper { buffer.position(pos + 8) Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) } + + def copyMemory(src: ByteBuffer, dst: ByteBuffer, len: Int): Unit = { + val srcPos = src.position() + val dstPos = dst.position() + src.position(srcPos + len) + dst.position(dstPos + len) + Platform.copyMemory(src.array(), Platform.BYTE_ARRAY_OFFSET + srcPos, + dst.array(), Platform.BYTE_ARRAY_OFFSET + dstPos, len) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7a14879b8b9d..b62696ee695d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.columnar +import scala.collection.Iterator + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodeGenerator, UnsafeRowWriter} +import org.apache.spark.sql.execution.vectorized.ColumnVector import org.apache.spark.sql.types._ /** @@ -28,7 +31,12 @@ import org.apache.spark.sql.types._ */ abstract class ColumnarIterator extends Iterator[InternalRow] { def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType], - columnIndexes: Array[Int]): Unit + columnIndexes: Array[Int], inMemoryTableScanExec: InMemoryTableScanExec): Unit + def getColumnIndexes(index: Int) : Int + def getColumnTypes(index: Int): DataType + def isSupportColumnarCodeGen: Boolean + def initForColumnar: Int + def getColumn(index: Int): ColumnVector } /** @@ -68,6 +76,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { val ctx = newCodeGenContext() val numFields = columnTypes.size + var _isSupportColumnarCodeGen = true val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") val accessorCls = dt match { @@ -92,10 +101,17 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera val createCode = dt match { case t if ctx.isPrimitiveType(dt) => + dt match { + case FloatType => + case DoubleType => + case _ => _isSupportColumnarCodeGen = false + } s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" case NullType | StringType | BinaryType => + _isSupportColumnarCodeGen = false s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" case other => + _isSupportColumnarCodeGen = false s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), (${dt.getClass.getName}) columnTypes[$index]);""" } @@ -157,7 +173,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; + import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec; + import org.apache.spark.sql.execution.columnar.CachedBatch; import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; + import org.apache.spark.sql.execution.vectorized.ColumnVector; public SpecificColumnarIterator generate(Object[] references) { return new SpecificColumnarIterator(); @@ -170,7 +189,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera private UnsafeRow unsafeRow = new UnsafeRow($numFields); private BufferHolder bufferHolder = new BufferHolder(unsafeRow); private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, $numFields); + private InMemoryTableScanExec inMemoryTableScanExec = null; private MutableUnsafeRow mutableRow = null; + private boolean readPartitionIncremented = false; + private CachedBatch cachedBatch = null; private int currentRow = 0; private int numRowsInBatch = 0; @@ -187,10 +209,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.mutableRow = new MutableUnsafeRow(rowWriter); } - public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { + public void initialize(Iterator input, DataType[] columnTypes, + int[] columnIndexes, InMemoryTableScanExec inMemoryTableScanExec) { this.input = input; this.columnTypes = columnTypes; this.columnIndexes = columnIndexes; + this.inMemoryTableScanExec = inMemoryTableScanExec; } ${ctx.declareAddedFunctions()} @@ -203,7 +227,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera return false; } - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); + CachedBatch batch = (CachedBatch) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); for (int i = 0; i < columnIndexes.length; i ++) { @@ -222,6 +246,30 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera unsafeRow.setTotalSize(bufferHolder.totalSize()); return unsafeRow; } + + public int getColumnIndexes(int index) { return columnIndexes[index]; } + + public DataType getColumnTypes(int index) { return columnTypes[index]; } + + public boolean isSupportColumnarCodeGen() { + return ${_isSupportColumnarCodeGen}; + } + + public int initForColumnar() { + if (!input.hasNext()) { + return -1; + } + if ((inMemoryTableScanExec != null) && !readPartitionIncremented) { + inMemoryTableScanExec.incrementReadPartitionAccumulator(); + readPartitionIncremented = true; + } + cachedBatch = (CachedBatch) input.next(); + return cachedBatch.numRows(); + } + + public ColumnVector getColumn(int i) { + return cachedBatch.column(this, i); + } }""" val code = CodeFormatter.stripOverlappingComments( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 079e122a5a85..65643a99c819 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.columnar +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.ByteOrder.nativeOrder + import scala.collection.JavaConverters._ import org.apache.commons.lang3.StringUtils @@ -30,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.vectorized.ColumnVector +import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.CollectionAccumulator @@ -53,7 +58,29 @@ private[sql] object InMemoryRelation { * @param stats The stat of columns */ private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) { + def column(columnarIterator: ColumnarIterator, index: Int): ColumnVector = { + val ordinal = columnarIterator.getColumnIndexes(index) + val dataType = columnarIterator.getColumnTypes(index) + val buffer = ByteBuffer.wrap(buffers(ordinal)).order(nativeOrder) + val accessor: BasicColumnAccessor[_] = dataType match { + case FloatType => new FloatColumnAccessor(buffer) + case DoubleType => new DoubleColumnAccessor(buffer) + } + + val (out, nullsBuffer) = if (accessor.isInstanceOf[NativeColumnAccessor[_]]) { + val nativeAccessor = accessor.asInstanceOf[NativeColumnAccessor[_]] + nativeAccessor.decompress(numRows); + } else { + val buffer = accessor.getByteBuffer + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + (buffer, nullsBuffer) + } + + ColumnVector.allocate(numRows, dataType, true, out, nullsBuffer) + } +} private[sql] case class InMemoryRelation( output: Seq[Attribute], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 67a410f539b6..dcedc542014a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -18,14 +18,18 @@ package org.apache.spark.sql.execution.columnar import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.UserDefinedType +import org.apache.spark.sql.execution.vectorized.ColumnVector +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ private[sql] case class InMemoryTableScanExec( @@ -110,6 +114,10 @@ private[sql] case class InMemoryTableScanExec( lazy val readPartitions = sparkContext.longAccumulator lazy val readBatches = sparkContext.longAccumulator + def incrementReadPartitionAccumulator(): Unit = { + readPartitions.add(1) + } + private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning protected override def doExecute(): RDD[InternalRow] = { @@ -172,11 +180,98 @@ private[sql] case class InMemoryTableScanExec( case other => other }.toArray val columnarIterator = GenerateColumnAccessor.generate(columnTypes) - columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray) - if (enableAccumulators && columnarIterator.hasNext) { - readPartitions.add(1) + columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray, + if (!enableAccumulators) null else this) + if (enableAccumulators && !columnarIterator.isSupportColumnarCodeGen && + columnarIterator.hasNext) { + incrementReadPartitionAccumulator } columnarIterator } } } + +private[sql] object InMemoryTableScanExec { + private val columnarItrName = "columnar_itr" + private val columnarBatchIdxName = "columnar_batchIdx" + + def enableColumnCodeGen( + sqlContext: SQLContext, ctx: CodegenContext, child: SparkPlan): Boolean = { + ctx.enableColumnCodeGen && + sqlContext.getConf(SQLConf.COLUMN_VECTOR_CODEGEN.key).toBoolean && + child.find(c => c.isInstanceOf[InMemoryTableScanExec]).isDefined && + child.find(c => c.isInstanceOf[CodegenSupport] && + c.asInstanceOf[CodegenSupport].useUnsafeRow).isEmpty + } + + def produceColumnLoop( + ctx: CodegenContext, codegen: CodegenSupport, output: Seq[Attribute]): String = { + val idx = columnarBatchIdxName + val numRows = "columnar_numRows" + ctx.addMutableState("int", idx, s"$idx = 0;") + ctx.addMutableState("int", numRows, s"$numRows = 0;") + val rowidx = ctx.freshName("rowIdx") + + val colVars = output.indices.map(i => ctx.freshName("col" + i)) + val columnAssigns = colVars.zipWithIndex.map { case (name, i) => + ctx.addMutableState("org.apache.spark.sql.execution.vectorized.ColumnVector", + name, s"$name = null;", s"$name = null;") + s"$name = ${columnarItrName}.getColumn($i);" + } + val columns = (output zip colVars).map { case (attr, colVar) => + new ColumnVectorReference(colVar, rowidx, attr.dataType, attr.nullable).genCode(ctx) } + + s""" + | while (true) { + | if ($idx == 0) { + | $numRows = ${columnarItrName}.initForColumnar(); + | if ($numRows < 0) { + | cleanup(); + | break; + | } + | ${columnAssigns.mkString("", "\n", "")} + | } + | + | while ($idx < $numRows) { + | int $rowidx = $idx++; + | ${codegen.consume(ctx, columns, null).trim} + | if (shouldStop()) return; + | } + | $idx = 0; + | } + """.stripMargin + } + + def produceProcessNext( + ctx: CodegenContext, codegen: CodegenSupport, child: SparkPlan, codeRow: String): String = { + ctx.isRow = false + val codeCol = child.asInstanceOf[CodegenSupport].produce(ctx, codegen) + val columnarItrClz = "org.apache.spark.sql.execution.columnar.ColumnarIterator" + val colItr = columnarItrName + ctx.addMutableState(s"$columnarItrClz", colItr, s"$colItr = null;", s"$colItr = null;") + + s""" + private void processBatch() throws java.io.IOException { + ${codeCol.trim} + } + + private void processRow() throws java.io.IOException { + ${codeRow.trim} + } + + private void cleanup() { + ${ctx.cleanupMutableStates()} + } + + protected void processNext() throws java.io.IOException { + if ((${columnarBatchIdxName} != 0) || + (${ctx.iteratorInput} instanceof $columnarItrClz && + ($colItr = ($columnarItrClz)${ctx.iteratorInput}).isSupportColumnarCodeGen())) { + processBatch(); + } else { + processRow(); + } + } + """.trim + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala index 6579b5068e65..fa2f915e7221 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.columnar.compression +import java.nio.ByteBuffer + import org.apache.spark.sql.catalyst.expressions.MutableRow import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor} import org.apache.spark.sql.types.AtomicType @@ -36,4 +38,6 @@ private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends Colu override def extractSingle(row: MutableRow, ordinal: Int): Unit = { decoder.next(row, ordinal) } + + def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = decoder.decompress(capacity) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala index b90d00b15b18..cecb4d79885c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala @@ -42,6 +42,8 @@ private[columnar] trait Decoder[T <: AtomicType] { def next(row: MutableRow, ordinal: Int): Unit def hasNext: Boolean + + def decompress(capacity: Int): (ByteBuffer, ByteBuffer) } private[columnar] trait CompressionScheme { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index 941f03b745a0..4a253dea6447 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.ByteBuffer +import java.nio.ByteOrder import scala.collection.mutable @@ -61,6 +62,46 @@ private[columnar] case object PassThrough extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + if (nullCount == 0) { + nullsBuffer.rewind() + (buffer.duplicate().order(ByteOrder.nativeOrder()), nullsBuffer) + } else { + val unitSize = columnType.dataType match { + case _: FloatType => 4 + case _: DoubleType => 8 + case _ => throw new IllegalStateException("Not supported type in PassThru.") + } + var nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + var pos = 0 + var seenNulls = 0 + val out = ByteBuffer.allocate(capacity * unitSize).order(ByteOrder.nativeOrder()) + while (buffer.hasRemaining) { + if (pos != nextNullIndex) { + val len = nextNullIndex - pos + assert(len * unitSize < Int.MaxValue) + ByteBufferHelper.copyMemory(buffer, out, len * unitSize) + pos += len + } else { + seenNulls += 1 + nextNullIndex = if (seenNulls < nullCount) { + ByteBufferHelper.getInt(nullsBuffer) + } else { + capacity + } + out.position(out.position + unitSize) + pos += 1 + } + } + out.rewind() + nullsBuffer.rewind() + (out, nullsBuffer) + } + } } } @@ -169,6 +210,10 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { } override def hasNext: Boolean = valueCount < run || buffer.hasRemaining + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + throw new IllegalStateException("Not support in RunLengthEncoding.") + } } } @@ -278,6 +323,10 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + throw new IllegalStateException("Not support in DictionaryEncoding.") + } } } @@ -368,6 +417,10 @@ private[columnar] case object BooleanBitSet extends CompressionScheme { } override def hasNext: Boolean = visited < count + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + throw new IllegalStateException("Not support in BooleanBitSet") + } } } @@ -448,6 +501,10 @@ private[columnar] case object IntDelta extends CompressionScheme { prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) row.setInt(ordinal, prev) } + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + throw new IllegalStateException("Not support in IntDelta") + } } } @@ -528,5 +585,9 @@ private[columnar] case object LongDelta extends CompressionScheme { prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) row.setLong(ordinal, prev) } + + override def decompress(capacity: Int): (ByteBuffer, ByteBuffer) = { + throw new IllegalStateException("Not support in LongDelta") + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 7c194ab72643..93ec860c95ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -73,6 +73,8 @@ case class BroadcastHashJoinExec( streamedPlan.asInstanceOf[CodegenSupport].inputRDDs() } + override def useUnsafeRow: Boolean = true + override def doProduce(ctx: CodegenContext): String = { streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index fac6b8de8ed5..38e6f681b9d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -502,6 +502,8 @@ case class SortMergeJoinExec( } } + override def useUnsafeRow: Boolean = true + override def doProduce(ctx: CodegenContext): String = { ctx.copyResult = true val leftInput = ctx.freshName("leftInput") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1a9bb6a0b54e..5b9df54364f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -84,6 +84,12 @@ object SQLConf { .intConf .createWithDefault(10000) + val COLUMN_VECTOR_CODEGEN = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.codegen") + .internal() + .doc("When set to true Spark SQL will generate code to access columnar storage.") + .booleanConf + .createWithDefault(true) + val IN_MEMORY_PARTITION_PRUNING = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCacheBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCacheBenchmark.scala new file mode 100644 index 000000000000..73631a35460e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameCacheBenchmark.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import java.util.Random + +import scala.util.Try + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Benchmark + +/** + * Benchmark to measure performance of columnar storage for dataframe cache. + * To run this: + * bin/spark-submit --class org.apache.spark.sql.DataFrameCacheBenchmark + * sql/core/target/spark-sql_*-tests.jar + * [float datasize scale] [double datasize scale] [master URL] + */ +class DataFrameCacheBenchmark { + + def withSQLConf(sqlContext: SQLContext, pairs: (String, String)*)(f: => Unit): Unit = { + val (keys, values) = pairs.unzip + val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption) + (keys, values).zipped.foreach(sqlContext.conf.setConfString) + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => sqlContext.conf.setConfString(key, value) + case (key, None) => sqlContext.conf.unsetConf(key) + } + } + } + + def floatSumBenchmark(sqlContext: SQLContext, values: Int, iters: Int = 5): Unit = { + import sqlContext.implicits._ + + val suites = Seq(("InternalRow", "false"), ("ColumnVector", "true")) + + val benchmarkPT = new Benchmark("Float Sum with PassThrough cache", values, iters) + val rand1 = new Random(511) + val dfPassThrough = sqlContext.sparkContext.parallelize(0 to values - 1, 1) + .map(i => rand1.nextFloat()).toDF().cache() + dfPassThrough.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkPT.addCase(s"$str codegen") { iter => + withSQLConf(sqlContext, SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfPassThrough.agg(sum("value")).collect + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Float Sum with PassThrough cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 476 / 483 66.1 15.1 1.0X + ColumnVector codegen 91 / 103 343.8 2.9 5.2X + */ + + benchmarkPT.run() + dfPassThrough.unpersist(true) + System.gc() + } + + def doubleSumBenchmark(sqlContext: SQLContext, values: Int, iters: Int = 5): Unit = { + import sqlContext.implicits._ + + val suites = Seq(("InternalRow", "false"), ("ColumnVector", "true")) + + val benchmarkPT = new Benchmark("Double Sum with PassThrough cache", values, iters) + val rand1 = new Random(511) + val dfPassThrough = sqlContext.sparkContext.parallelize(0 to values - 1, 1) + .map(i => rand1.nextDouble()).toDF().cache() + dfPassThrough.count() // force to create df.cache() + suites.foreach { + case (str, value) => + benchmarkPT.addCase(s"$str codegen") { iter => + withSQLConf(sqlContext, SQLConf. COLUMN_VECTOR_CODEGEN.key -> value) { + dfPassThrough.agg(sum("value")).collect + } + } + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64 + Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz + Double Sum with PassThrough cache: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + InternalRow codegen 290 / 306 54.3 18.4 1.0X + ColumnVector codegen 95 / 101 165.7 6.0 3.1X + */ + + benchmarkPT.run() + dfPassThrough.unpersist(true) + System.gc() + } + + def run(sqlContext: SQLContext, f: Int, d: Int): Unit = { + sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + + floatSumBenchmark(sqlContext, 1024 * 1024 * f) + doubleSumBenchmark(sqlContext, 1024 * 1024 * d) + } +} + +object DataFrameCacheBenchmark { + val F = 30 + val D = 15 + def main(args: Array[String]): Unit = { + val f = if (args.length > 0) args(0).toInt else F + val d = if (args.length > 1) args(1).toInt else D + val masterURL = if (args.length > 2) args(2) else "local[1]" + + val conf = new SparkConf() + val sc = new SparkContext(masterURL, "DataFrameCacheBenchmark", conf) + val sqlContext = new SQLContext(sc) + + val benchmark = new DataFrameCacheBenchmark + benchmark.run(sqlContext, f, d) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataFrameCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataFrameCacheSuite.scala new file mode 100644 index 000000000000..fd7e891d4d9f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataFrameCacheSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class DataFrameCacheSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("range/filter should be combined with column codegen") { + val df = sparkContext.parallelize(0 to 9, 1).map(i => i.toFloat).toDF().cache() + .filter("value = 1").selectExpr("value + 1") + assert(df.collect() === Array(Row(2.0))) + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen).isDefined) + } + + test("filters should be combined with column codegen") { + val df = sparkContext.parallelize(0 to 9, 1).map(i => i.toFloat).toDF().cache() + .filter("value % 2.0 == 0").filter("value % 3.0 == 0") + assert(df.collect() === Array(Row(0), Row(6.0))) + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen).isDefined) + } + + test("filter with null should be included in WholeStageCodegen with column codegen") { + val toFloat = udf[java.lang.Float, String] { s => if (s == "2") null else s.toFloat } + val df0 = sparkContext.parallelize(0 to 4, 1).map(i => i.toString).toDF() + val df = df0.withColumn("i", toFloat(df0("value"))).select("i").toDF().cache() + .filter("i % 2.0 == 0") + assert(df.collect() === Array(Row(0), Row(4.0))) + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen).isDefined) + } + + test("Aggregate should be included in WholeStageCodegen with column codegen") { + val df = sparkContext.parallelize(0 to 9, 1).map(i => i.toFloat).toDF().cache() + .groupBy().agg(max(col("value")), avg(col("value"))) + assert(df.collect() === Array(Row(9, 4.5))) + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) + } + + test("Aggregate with grouping keys should be included in WholeStageCodegen with column codegen") { + val df = sparkContext.parallelize(0 to 2, 1).map(i => i.toFloat).toDF().cache() + .groupBy("value").count().orderBy("value") + assert(df.collect() === Array(Row(0.0, 1), Row(1.0, 1), Row(2.0, 1))) + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) + } + + test("Aggregate with columns should be included in WholeStageCodegen with column codegen") { + val df = sparkContext.parallelize(0 to 10, 1).map(i => (i, (i * 2).toDouble)).toDF("i", "d") + .cache().agg(sum("d")) + assert(df.collect() === Array(Row(110.0))) + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) + } + + test("Sort should be included in WholeStageCodegen without column codegen") { + val df = sparkContext.parallelize(Seq(3.toFloat, 2.toFloat, 1.toFloat), 1).toDF() + .sort(col("value")) + val plan = df.queryExecution.executedPlan + assert(df.collect() === Array(Row(1.0), Row(2.0), Row(3.0))) + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + !p.asInstanceOf[WholeStageCodegenExec].enableColumnCodeGen && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]).isDefined) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala index 830ca0294e1b..b0b611560eac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala @@ -26,13 +26,15 @@ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.AtomicType class DictionaryEncodingSuite extends SparkFunSuite { + val nullValue = -1 testDictionaryEncoding(new IntColumnStats, INT) testDictionaryEncoding(new LongColumnStats, LONG) - testDictionaryEncoding(new StringColumnStats, STRING) + testDictionaryEncoding(new StringColumnStats, STRING, false) def testDictionaryEncoding[T <: AtomicType]( columnStats: ColumnStats, - columnType: NativeColumnType[T]) { + columnType: NativeColumnType[T], + testDecompress: Boolean = true) { val typeName = columnType.getClass.getSimpleName.stripSuffix("$") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala index 988a577a7b4d..6d73c79b92e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.IntegralType class IntegralDeltaSuite extends SparkFunSuite { + val nullValue = -1 testIntegralDelta(new IntColumnStats, INT, IntDelta) testIntegralDelta(new LongColumnStats, LONG, LongDelta) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughSuite.scala new file mode 100644 index 000000000000..70d0590eabcd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughSuite.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.{AtomicType, IntegralType} + +class PassThroughSuite extends SparkFunSuite { + val nullValue = -1 + testPassThrough(new FloatColumnStats, FLOAT) + testPassThrough(new DoubleColumnStats, DOUBLE) + + def testPassThrough[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T]) { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def skeleton(input: Seq[T#InternalType]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) + + input.map { value => + val row = new GenericMutableRow(1) + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + + val buffer = builder.build() + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + input.size * columnType.defaultSize + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + buffer.position(headerSize) + assertResult(PassThrough.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + if (input.nonEmpty) { + input.foreach { value => + assertResult(value, "Wrong value")(columnType.extract(buffer)) + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = PassThrough.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (input.nonEmpty) { + input.foreach{ + assert(decoder.hasNext) + assertResult(_, "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + assert(!decoder.hasNext) + } + + def skeletonForDecompress(input: Seq[T#InternalType]) { + val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) + val row = new GenericMutableRow(1) + val nullRow = new GenericMutableRow(1) + nullRow.setNullAt(0) + input.map { value => + if (value == nullValue) { + builder.appendFrom(nullRow, 0) + } else { + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + } + val buffer = builder.build() + + // ---------------- + // Tests decompress + // ---------------- + // Rewinds, skips column header and 4 more bytes for compression scheme ID + val headerSize = CompressionScheme.columnHeaderSize(buffer) + buffer.position(headerSize) + assertResult(PassThrough.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val decoder = PassThrough.decoder(buffer, columnType) + val (decodeBuffer, nullsBuffer) = decoder.decompress(input.length) + + if (input.nonEmpty) { + val numNulls = ByteBufferHelper.getInt(nullsBuffer) + var cntNulls = 0 + var nullPos = if (numNulls == 0) -1 else ByteBufferHelper.getInt(nullsBuffer) + input.zipWithIndex.foreach { + case (expected: Any, index: Int) if expected == nullValue => + assertResult(index, "Wrong null position") { + nullPos + } + decodeBuffer.position(decodeBuffer.position + columnType.defaultSize) + cntNulls += 1 + if (cntNulls < numNulls) { + nullPos = ByteBufferHelper.getInt(nullsBuffer) + } + case (expected: Byte, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded byte value") { + decodeBuffer.get() + } + case (expected: Short, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded short value") { + ByteBufferHelper.getShort(decodeBuffer) + } + case (expected: Int, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded int value") { + ByteBufferHelper.getInt(decodeBuffer) + } + case (expected: Long, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded long value") { + ByteBufferHelper.getLong(decodeBuffer) + } + case (expected: Float, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded float value") { + ByteBufferHelper.getFloat(decodeBuffer) + } + case (expected: Double, index: Int) => + assertResult(expected, s"Wrong ${index}-th decoded double value") { + ByteBufferHelper.getDouble(decodeBuffer) + } + case _ => fail("Unsupported type") + } + } + assert(!decodeBuffer.hasRemaining) + } + + test(s"$PassThrough with $typeName: empty column") { + skeleton(Seq.empty) + } + + test(s"$PassThrough with $typeName: long random series") { + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeleton(input.map(_.asInstanceOf[T#InternalType])) + } + + test(s"$PassThrough with $typeName: empty column for decompress()") { + skeletonForDecompress(Seq.empty) + } + + test(s"$PassThrough with $typeName: long random series for decompress()") { + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeletonForDecompress(input.map(_.asInstanceOf[T#InternalType])) + } + + test(s"$PassThrough with $typeName: simple case with null for decompress()") { + val input = columnType match { + case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue: Float, 5: Float) + case DOUBLE => Seq(2: Double, 1: Double, 2: Double, nullValue: Double, 5: Double) + } + + skeletonForDecompress(input.map(_.asInstanceOf[T#InternalType])) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala index 95642e93ae9f..76156e508053 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -24,16 +24,18 @@ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.AtomicType class RunLengthEncodingSuite extends SparkFunSuite { + val nullValue = -1 testRunLengthEncoding(new NoopColumnStats, BOOLEAN) testRunLengthEncoding(new ByteColumnStats, BYTE) testRunLengthEncoding(new ShortColumnStats, SHORT) testRunLengthEncoding(new IntColumnStats, INT) testRunLengthEncoding(new LongColumnStats, LONG) - testRunLengthEncoding(new StringColumnStats, STRING) + testRunLengthEncoding(new StringColumnStats, STRING, false) def testRunLengthEncoding[T <: AtomicType]( columnStats: ColumnStats, - columnType: NativeColumnType[T]) { + columnType: NativeColumnType[T], + testDecompress: Boolean = true) { val typeName = columnType.getClass.getSimpleName.stripSuffix("$")