From 7c158bd137f057453d17ef360906e5be90bf5004 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 31 Mar 2016 14:15:34 -0700 Subject: [PATCH 01/13] [SPARK-14394] --- .../aggregate/TungstenAggregateHashMap.scala | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala new file mode 100644 index 0000000000000..d08dece93e44f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.StructType + +class TungstenAggregateHashMap( + ctx: CodegenContext, + generatedClassName: String, + groupingKeySchema: StructType, + bufferSchema: StructType) { + val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) + val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) + val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") + + def generateAggregateHashMap(): String = { + + s""" + |public class $generatedClassName { + |${initializeAggregateHashMap()} + | + |${generateFindOrInsert()} + | + |${generateEquals()} + | + |${generateHashFunction()} + |} + """.stripMargin + } + + def initializeAggregateHashMap(): String = { + val generatedSchema: String = + s""" + |new org.apache.spark.sql.types.StructType() + |${(groupingKeySchema ++ bufferSchema).map(key => + s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") + .mkString("\n")}; + """.stripMargin + + s""" + | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | private int[] buckets; + | private int numBuckets; + | private int maxSteps; + | private int numRows = 0; + | private org.apache.spark.sql.types.StructType schema = $generatedSchema + | + | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { + | assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); + | this.maxSteps = maxSteps; + | numBuckets = (int) (capacity / loadFactor); + | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, + | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); + | buckets = new int[numBuckets]; + | java.util.Arrays.fill(buckets, -1); + | } + | + | public $generatedClassName() { + | new $generatedClassName(1 << 16, 0.25, 5); + | } + """.stripMargin + } + + def generateHashFunction(): String = { + s""" + |// TODO: Improve this Hash Function + |private long hash($groupingKeySignature) { + | return ${groupingKeys.map(_._2).mkString(" & ")}; + |} + """.stripMargin + } + + def generateEquals(): String = { + s""" + |private boolean equals(int idx, $groupingKeySignature) { + | return ${groupingKeys.zipWithIndex.map(key => + s"batch.column(${key._2}).getLong(buckets[idx]) == ${key._1._2}").mkString(" && ")}; + |} + """.stripMargin + } + + def generateFindOrInsert(): String = { + s""" + |public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(${ + groupingKeySignature}) { + | int idx = find(${groupingKeys.map(_._2).mkString(", ")}); + | if (idx != -1 && buckets[idx] == -1) { + | ${groupingKeys.zipWithIndex.map(key => + s"batch.column(${key._2}).putLong(numRows, ${key._1._2});").mkString("\n")} + | ${bufferValues.zipWithIndex.map(key => + s"batch.column(${groupingKeys.length + key._2}).putLong(numRows, 0);") + .mkString("\n")} + | buckets[idx] = numRows++; + | } + | return batch.getRow(buckets[idx]); + |} + | + |private int find($groupingKeySignature) { + | long h = hash(${groupingKeys.map(_._2).mkString(", ")}); + | int step = 0; + | int idx = (int) h & (numBuckets - 1); + | while (step < maxSteps) { + | // Return bucket index if it's either an empty slot or already contains the key + | if (buckets[idx] == -1) { + | return idx; + | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { + | return idx; + | } + | idx = (idx + 1) & (numBuckets - 1); + | step++; + | } + |// Didn't find it + |return -1; + |} + """.stripMargin + } +} From ebaea6a87b704afedd47bdd2dd17c92c3ffc6e8e Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 6 Apr 2016 17:37:08 -0700 Subject: [PATCH 02/13] Integrating AggregateHashMap for Aggregates with Group By --- .../ColumnarAggMapCodeGenerator.scala | 19 ++- .../aggregate/TungstenAggregate.scala | 78 ++++++++-- .../aggregate/TungstenAggregateHashMap.scala | 133 ------------------ .../BenchmarkWholeStageCodegen.scala | 5 +- 4 files changed, 75 insertions(+), 160 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala index e415dd8e6ac9f..84d0b2b6e5f8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala @@ -66,26 +66,25 @@ class ColumnarAggMapCodeGenerator( """.stripMargin s""" - | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; | private int[] buckets; | private int numBuckets; | private int maxSteps; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema | - | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { - | assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); - | this.maxSteps = maxSteps; - | numBuckets = (int) (capacity / loadFactor); + | public $generatedClassName() { + | int DEFAULT_CAPACITY = 1 << 16; + | double DEFAULT_LOAD_FACTOR = 0.25; + | int DEFAULT_MAX_STEPS = 5; + | assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0)); + | this.maxSteps = DEFAULT_MAX_STEPS; + | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); + | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); | buckets = new int[numBuckets]; | java.util.Arrays.fill(buckets, -1); | } - | - | public $generatedClassName() { - | new $generatedClassName(1 << 16, 0.25, 5); - | } """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 253592028c7f9..eaca2b9ba88e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -261,6 +261,10 @@ case class TungstenAggregate( .map(_.asInstanceOf[DeclarativeAggregate]) private val bufferSchema = StructType.fromAttributes(aggregateBufferAttributes) + // The name for AggregateHashMap + private var aggregateHashMapTerm: String = _ + private var isAggregateHashMapEnabled: Boolean = true + // The name for HashMap private var hashMapTerm: String = _ private var sorterTerm: String = _ @@ -437,17 +441,21 @@ case class TungstenAggregate( val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") - // create AggregateHashMap - val isAggregateHashMapEnabled: Boolean = false - val isAggregateHashMapSupported: Boolean = + // We currently only enable aggregate hashmap for long key/value types + isAggregateHashMapEnabled = isAggregateHashMapEnabled && (groupingKeySchema ++ bufferSchema).forall(_.dataType == LongType) - val aggregateHashMapTerm = ctx.freshName("aggregateHashMap") + aggregateHashMapTerm = ctx.freshName("aggregateHashMap") val aggregateHashMapClassName = ctx.freshName("GeneratedAggregateHashMap") val aggregateHashMapGenerator = new ColumnarAggMapCodeGenerator(ctx, aggregateHashMapClassName, groupingKeySchema, bufferSchema) - if (isAggregateHashMapEnabled && isAggregateHashMapSupported) { + // Create a name for iterator from AggregateHashMap + val iterTermForGeneratedHashMap = ctx.freshName("genMapIter") + if (isAggregateHashMapEnabled) { ctx.addMutableState(aggregateHashMapClassName, aggregateHashMapTerm, s"$aggregateHashMapTerm = new $aggregateHashMapClassName();") + ctx.addMutableState( + "java.util.Iterator", + iterTermForGeneratedHashMap, "") } // create hashMap @@ -465,11 +473,14 @@ case class TungstenAggregate( val doAgg = ctx.freshName("doAggregateWithKeys") ctx.addNewFunction(doAgg, s""" - ${if (isAggregateHashMapSupported) aggregateHashMapGenerator.generate() else ""} + ${if (isAggregateHashMapEnabled) aggregateHashMapGenerator.generate() else ""} private void $doAgg() throws java.io.IOException { $hashMapTerm = $thisPlan.createHashMap(); ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} + ${if (isAggregateHashMapEnabled) { + s"$iterTermForGeneratedHashMap = $aggregateHashMapTerm.batch.rowIterator();"} else ""} + $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm); } """) @@ -491,6 +502,19 @@ case class TungstenAggregate( } // output the result + ${if (isAggregateHashMapEnabled) { + s"""while ($iterTermForGeneratedHashMap.hasNext()) { + $numOutput.add(1); + org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row row = + (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) + $iterTermForGeneratedHashMap.next(); + append(row.copy()); + + if (shouldStop()) return; + } + + $aggregateHashMapTerm.batch.close();"""} else ""} + while ($iterTerm.next()) { $numOutput.add(1); UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey(); @@ -513,8 +537,11 @@ case class TungstenAggregate( ctx.currentVars = input val keyCode = GenerateUnsafeProjection.createCode( ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output))) + val groupByKeys = ctx.generateExpressions( + groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output))) val key = keyCode.value val buffer = ctx.freshName("aggBuffer") + val aggregateRow = ctx.freshName("aggregateRow") // only have DeclarativeAggregate val updateExpr = aggregateExpressions.flatMap { e => @@ -533,15 +560,22 @@ case class TungstenAggregate( val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input + ctx.INPUT_ROW = aggregateRow + // TODO: support subexpression elimination + val aggregateRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) + val updateAggregateRow = aggregateRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + ctx.updateColumn(aggregateRow, dt, groupingKeySchema.length + i, ev, updateExpr(i).nullable) + } ctx.INPUT_ROW = buffer // TODO: support subexpression elimination val evals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) - val updates = evals.zipWithIndex.map { case (ev, i) => + val updateAggregateBuffer = evals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType ctx.updateColumn(buffer, dt, i, ev, updateExpr(i).nullable) } - val (checkFallback, resetCoulter, incCounter) = if (testFallbackStartsAt.isDefined) { + val (checkFallback, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { val countTerm = ctx.freshName("fallbackCounter") ctx.addMutableState("int", countTerm, s"$countTerm = 0;") (s"$countTerm < ${testFallbackStartsAt.get}", s"$countTerm = 0;", s"$countTerm += 1;") @@ -558,17 +592,24 @@ case class TungstenAggregate( ${keyCode.code.trim} ${hashEval.code.trim} UnsafeRow $buffer = null; + org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $aggregateRow = null; if ($checkFallback) { + ${if (isAggregateHashMapEnabled) { + s"""$aggregateRow = + $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")});""" + } else ""} // try to get the buffer from hash map - $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); + if ($aggregateRow == null) { + $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); + } } - if ($buffer == null) { + if ($aggregateRow == null && $buffer == null) { if ($sorterTerm == null) { $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); } else { $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); } - $resetCoulter + $resetCounter // the hash map had be spilled, it should have enough memory now, // try to allocate buffer again. $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); @@ -579,10 +620,17 @@ case class TungstenAggregate( } $incCounter - // evaluate aggregate function - ${evaluateVariables(evals)} - // update aggregate buffer - ${updates.mkString("\n").trim} + if ($aggregateRow != null) { + // evaluate aggregate function + ${evaluateVariables(aggregateRowEvals)} + // update aggregate row + ${updateAggregateRow.mkString("\n").trim} + } else { + // evaluate aggregate function + ${evaluateVariables(evals)} + // update aggregate buffer + ${updateAggregateBuffer.mkString("\n").trim} + } """ } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala deleted file mode 100644 index d08dece93e44f..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.aggregate - -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -import org.apache.spark.sql.types.StructType - -class TungstenAggregateHashMap( - ctx: CodegenContext, - generatedClassName: String, - groupingKeySchema: StructType, - bufferSchema: StructType) { - val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) - val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) - val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") - - def generateAggregateHashMap(): String = { - - s""" - |public class $generatedClassName { - |${initializeAggregateHashMap()} - | - |${generateFindOrInsert()} - | - |${generateEquals()} - | - |${generateHashFunction()} - |} - """.stripMargin - } - - def initializeAggregateHashMap(): String = { - val generatedSchema: String = - s""" - |new org.apache.spark.sql.types.StructType() - |${(groupingKeySchema ++ bufferSchema).map(key => - s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") - .mkString("\n")}; - """.stripMargin - - s""" - | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; - | private int[] buckets; - | private int numBuckets; - | private int maxSteps; - | private int numRows = 0; - | private org.apache.spark.sql.types.StructType schema = $generatedSchema - | - | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { - | assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); - | this.maxSteps = maxSteps; - | numBuckets = (int) (capacity / loadFactor); - | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); - | buckets = new int[numBuckets]; - | java.util.Arrays.fill(buckets, -1); - | } - | - | public $generatedClassName() { - | new $generatedClassName(1 << 16, 0.25, 5); - | } - """.stripMargin - } - - def generateHashFunction(): String = { - s""" - |// TODO: Improve this Hash Function - |private long hash($groupingKeySignature) { - | return ${groupingKeys.map(_._2).mkString(" & ")}; - |} - """.stripMargin - } - - def generateEquals(): String = { - s""" - |private boolean equals(int idx, $groupingKeySignature) { - | return ${groupingKeys.zipWithIndex.map(key => - s"batch.column(${key._2}).getLong(buckets[idx]) == ${key._1._2}").mkString(" && ")}; - |} - """.stripMargin - } - - def generateFindOrInsert(): String = { - s""" - |public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(${ - groupingKeySignature}) { - | int idx = find(${groupingKeys.map(_._2).mkString(", ")}); - | if (idx != -1 && buckets[idx] == -1) { - | ${groupingKeys.zipWithIndex.map(key => - s"batch.column(${key._2}).putLong(numRows, ${key._1._2});").mkString("\n")} - | ${bufferValues.zipWithIndex.map(key => - s"batch.column(${groupingKeys.length + key._2}).putLong(numRows, 0);") - .mkString("\n")} - | buckets[idx] = numRows++; - | } - | return batch.getRow(buckets[idx]); - |} - | - |private int find($groupingKeySignature) { - | long h = hash(${groupingKeys.map(_._2).mkString(", ")}); - | int step = 0; - | int idx = (int) h & (numBuckets - 1); - | while (step < maxSteps) { - | // Return bucket index if it's either an empty slot or already contains the key - | if (buckets[idx] == -1) { - | return idx; - | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { - | return idx; - | } - | idx = (idx + 1) & (numBuckets - 1); - | step++; - | } - |// Didn't find it - |return -1; - |} - """.stripMargin - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 352fd07d0e8b0..0595227fd8bc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -150,11 +150,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("aggregate with keys") { + test("aggregate with keys") { val N = 20 << 20 runBenchmark("Aggregate w keys", N) { - sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + sqlContext.range(N).selectExpr("id", "(id & 65535) as k1", "(id & 65535) as k2") + .groupBy("k1", "k2").sum("id", "k1").collect() //.foreach(println) } /* From cee7e65b3cf7569b4e46941158f164c2130c3981 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 12 Apr 2016 10:33:42 -0700 Subject: [PATCH 03/13] Add SQLConf --- .../sql/execution/WholeStageCodegen.scala | 3 +- .../ColumnarAggMapCodeGenerator.scala | 24 +- .../aggregate/TungstenAggregate.scala | 75 ++++-- .../apache/spark/sql/internal/SQLConf.scala | 8 + .../BenchmarkWholeStageCodegen.scala | 239 ++++++++++-------- 5 files changed, 220 insertions(+), 129 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 447dbe701815b..d8a0babe8b1c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -126,6 +126,7 @@ trait CodegenSupport extends SparkPlan { // outputVars will be used to generate the code for UnsafeRow, so we should copy them outputVars.map(_.copy()) } + val rowVar = if (row != null) { ExprCode("", "false", row) } else { @@ -338,7 +339,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup // try to compile, helpful for debug val cleanedSource = CodeFormatter.stripExtraNewLines(source) - logDebug(s"\n${CodeFormatter.format(cleanedSource)}") + // println(s"\n${CodeFormatter.format(cleanedSource)}") CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala index 84d0b2b6e5f8e..cf27cb1eb3ec2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala @@ -65,23 +65,38 @@ class ColumnarAggMapCodeGenerator( .mkString("\n")}; """.stripMargin + val generatedSchema2: String = + s""" + |new org.apache.spark.sql.types.StructType() + |${(bufferSchema).map(key => + s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") + .mkString("\n")}; + """.stripMargin + s""" | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch2; | private int[] buckets; | private int numBuckets; | private int maxSteps; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema + | private org.apache.spark.sql.types.StructType schema2 = $generatedSchema2 | | public $generatedClassName() { | int DEFAULT_CAPACITY = 1 << 16; | double DEFAULT_LOAD_FACTOR = 0.25; - | int DEFAULT_MAX_STEPS = 5; + | int DEFAULT_MAX_STEPS = 2; | assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0)); | this.maxSteps = DEFAULT_MAX_STEPS; | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + | batch2 = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema2, + | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + | for (int i = 0 ; i < batch2.numCols(); i++) { + | batch2.setColumn(i, batch.column(i+${groupingKeys.length})); + | } | buckets = new int[numBuckets]; | java.util.Arrays.fill(buckets, -1); | } @@ -102,7 +117,7 @@ class ColumnarAggMapCodeGenerator( s""" |// TODO: Improve this hash function |private long hash($groupingKeySignature) { - | return ${groupingKeys.map(_._2).mkString(" ^ ")}; + | return ${groupingKeys.map(_._2).mkString(" | ")}; |} """.stripMargin } @@ -177,9 +192,10 @@ class ColumnarAggMapCodeGenerator( s"batch.column(${groupingKeys.length + k._2}).putLong(numRows, 0);") .mkString("\n")} | buckets[idx] = numRows++; - | return batch.getRow(buckets[idx]); + | batch.setNumRows(numRows); + | return batch2.getRow(buckets[idx]); | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { - | return batch.getRow(buckets[idx]); + | return batch2.getRow(buckets[idx]); | } | idx = (idx + 1) & (numBuckets - 1); | step++; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index eaca2b9ba88e0..ba9edc76a0a60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -263,11 +263,12 @@ case class TungstenAggregate( // The name for AggregateHashMap private var aggregateHashMapTerm: String = _ - private var isAggregateHashMapEnabled: Boolean = true + private var isAggregateHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled // The name for HashMap private var hashMapTerm: String = _ private var sorterTerm: String = _ + private var abc: String = _ /** * This is called by generated Java class, should be public. @@ -409,10 +410,15 @@ case class TungstenAggregate( BindReferences.bindReference(e, inputAttrs).gen(ctx) } s""" + // 1 $evaluateKeyVars + // 2 $evaluateBufferVars + // 3 $evaluateAggResults + // 4 ${consume(ctx, resultVars)} + // 5 """ } else if (modes.contains(Partial) || modes.contains(PartialMerge)) { @@ -503,17 +509,46 @@ case class TungstenAggregate( // output the result ${if (isAggregateHashMapEnabled) { - s"""while ($iterTermForGeneratedHashMap.hasNext()) { - $numOutput.add(1); - org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row row = - (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) - $iterTermForGeneratedHashMap.next(); - append(row.copy()); + val row = ctx.freshName("aggregateHashmapRow") + var schema: StructType = groupingKeySchema + bufferSchema.foreach(i => schema = schema.add(i)) + val aggregateRow = ctx.freshName("aggregateRow") + ctx.addMutableState("UnsafeProjection", aggregateRow, "") - if (shouldStop()) return; - } - - $aggregateHashMapTerm.batch.close();"""} else ""} + ctx.currentVars = null + ctx.INPUT_ROW = row + val code = GenerateUnsafeProjection.createCode(ctx, + schema.toAttributes.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) + }) + + UnsafeProjection.create(schema) + val x = ctx.freshName("x") + // ctx.INPUT_ROW = row + // ctx.currentVars = null +/* + val eval = resultExpressions.map { e => + BindReferences.bindReference(e, groupingAttributes).gen(ctx) + } +*/ + s""" + while ($iterTermForGeneratedHashMap.hasNext()) { + $numOutput.add(1); + org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $row = + (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) + $iterTermForGeneratedHashMap.next(); + ${code.code} + // UnsafeRow $x = aggregateRow.apply($row); + ${consume(ctx, Seq.empty, {code.value})} + // append(row.copy()); + + if (shouldStop()) return; + } + + $aggregateHashMapTerm.batch.close(); + """ + } else "" + } while ($iterTerm.next()) { $numOutput.add(1); @@ -561,11 +596,19 @@ case class TungstenAggregate( val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input ctx.INPUT_ROW = aggregateRow + var schema: StructType = groupingKeySchema + bufferSchema.foreach(i => schema = schema.add(i)) + // ctx.currentVars = null + val code = GenerateUnsafeProjection.createCode(ctx, + schema.toAttributes.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) + }) + // TODO: support subexpression elimination val aggregateRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) val updateAggregateRow = aggregateRowEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType - ctx.updateColumn(aggregateRow, dt, groupingKeySchema.length + i, ev, updateExpr(i).nullable) + ctx.updateColumn(aggregateRow, dt, i, ev, updateExpr(i).nullable) } ctx.INPUT_ROW = buffer // TODO: support subexpression elimination @@ -588,18 +631,18 @@ case class TungstenAggregate( // continue to do in-memory aggregation and spilling until all the rows had been processed. // Finally, sort the spilled aggregate buffers by key, and merge them together for same key. s""" - // generate grouping key - ${keyCode.code.trim} - ${hashEval.code.trim} UnsafeRow $buffer = null; org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $aggregateRow = null; if ($checkFallback) { ${if (isAggregateHashMapEnabled) { s"""$aggregateRow = - $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")});""" + $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")});""" } else ""} // try to get the buffer from hash map if ($aggregateRow == null) { + // generate grouping key + ${keyCode.code.trim} + ${hashEval.code.trim} $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e74fb00cb26c0..343b95b6a9a44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -437,6 +437,12 @@ object SQLConf { .stringConf .createOptional + val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled") + .internal() + .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -561,6 +567,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) + def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED) + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 0595227fd8bc0..f63f7808148d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -124,40 +124,63 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /** - Using ImperativeAggregate (as implemented in Spark 1.6): + * Using ImperativeAggregate (as implemented in Spark 1.6): + ** + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate + *------------------------------------------------------------------------------- + *stddev w/o codegen 2019.04 10.39 1.00 X + *stddev w codegen 2097.29 10.00 0.96 X + *kurtosis w/o codegen 2108.99 9.94 0.96 X + *kurtosis w codegen 2090.69 10.03 0.97 X + ** + *Using DeclarativeAggregate: + ** + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *stddev codegen=false 5630 / 5776 18.0 55.6 1.0X + *stddev codegen=true 1259 / 1314 83.0 12.0 4.5X + ** + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X + *kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X + */ + } - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - stddev w/o codegen 2019.04 10.39 1.00 X - stddev w codegen 2097.29 10.00 0.96 X - kurtosis w/o codegen 2108.99 9.94 0.96 X - kurtosis w codegen 2090.69 10.03 0.97 X + test("aggregate with keys") { + val N = 20 << 20 - Using DeclarativeAggregate: + val benchmark = new Benchmark("Aggregate w keys", N) + // def f(): Unit = sqlContext.range(N).selectExpr("id", "(id & 65535) as k1", "(id & 65535) as k2").groupBy("k1", "k2").sum("id").collect() + // .foreach(println) + def f(): Unit = sqlContext.range(N).selectExpr("id", "(id & 65535) as k1").groupBy("k1").sum("id").collect() - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - stddev codegen=false 5630 / 5776 18.0 55.6 1.0X - stddev codegen=true 1259 / 1314 83.0 12.0 4.5X + //def f(): Unit = sqlContext.range(N).selectExpr("id").groupBy().sum("id").collect() - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X - kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X - */ - } +/* + benchmark.addCase(s"codegen=false") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + f() + } - test("aggregate with keys") { - val N = 20 << 20 + benchmark.addCase(s"codegen=true hashmap=false") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + f() + } +*/ - runBenchmark("Aggregate w keys", N) { - sqlContext.range(N).selectExpr("id", "(id & 65535) as k1", "(id & 65535) as k2") - .groupBy("k1", "k2").sum("id", "k1").collect() //.foreach(println) + benchmark.addCase(s"codegen=true hashmap=true") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + f() } + benchmark.run() + /* Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative @@ -191,12 +214,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X - Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X + *Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X */ val dim2 = broadcast(sqlContext.range(M) @@ -209,12 +232,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X - Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X + *Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X */ val dim3 = broadcast(sqlContext.range(M) @@ -227,12 +250,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X - Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X + *Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X */ val dim4 = broadcast(sqlContext.range(M) @@ -245,11 +268,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X - Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X + *Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X */ runBenchmark("outer join w long", N) { @@ -257,12 +280,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X - outer join w long codegen=true 261 / 276 80.5 12.4 11.7X + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X + *outer join w long codegen=true 261 / 276 80.5 12.4 11.7X */ runBenchmark("semi join w long", N) { @@ -270,12 +293,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X - semi join w long codegen=true 237 / 244 88.3 11.3 8.1X + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X + *semi join w long codegen=true 237 / 244 88.3 11.3 8.1X */ } @@ -288,11 +311,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - merge join codegen=false 1588 / 1880 1.3 757.1 1.0X - merge join codegen=true 1477 / 1531 1.4 704.2 1.1X + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *merge join codegen=false 1588 / 1880 1.3 757.1 1.0X + *merge join codegen=true 1477 / 1531 1.4 704.2 1.1X */ runBenchmark("sort merge join", N) { @@ -304,11 +327,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X - sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X + *sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X */ } @@ -324,12 +347,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X - shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X + *shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X */ } @@ -342,11 +365,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - cube codegen=false 3188 / 3392 1.6 608.2 1.0X - cube codegen=true 1239 / 1394 4.2 236.3 2.6X + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *cube codegen=false 3188 / 3392 1.6 608.2 1.0X + *cube codegen=true 1239 / 1394 4.2 236.3 2.6X */ } @@ -598,21 +621,21 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - UnsafeRow hash 267 / 284 78.4 12.8 1.0X - murmur3 hash 102 / 129 205.5 4.9 2.6X - fast hash 79 / 96 263.8 3.8 3.4X - arrayEqual 164 / 172 128.2 7.8 1.6X - Java HashMap (Long) 321 / 399 65.4 15.3 0.8X - Java HashMap (two ints) 328 / 363 63.9 15.7 0.8X - Java HashMap (UnsafeRow) 1140 / 1200 18.4 54.3 0.2X - LongToUnsafeRowMap (opt=false) 378 / 400 55.5 18.0 0.7X - LongToUnsafeRowMap (opt=true) 144 / 152 145.2 6.9 1.9X - BytesToBytesMap (off Heap) 1300 / 1616 16.1 62.0 0.2X - BytesToBytesMap (on Heap) 1165 / 1202 18.0 55.5 0.2X - Aggregate HashMap 121 / 131 173.3 5.8 2.2X + *Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + *BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *UnsafeRow hash 267 / 284 78.4 12.8 1.0X + *murmur3 hash 102 / 129 205.5 4.9 2.6X + *fast hash 79 / 96 263.8 3.8 3.4X + *arrayEqual 164 / 172 128.2 7.8 1.6X + *Java HashMap (Long) 321 / 399 65.4 15.3 0.8X + *Java HashMap (two ints) 328 / 363 63.9 15.7 0.8X + *Java HashMap (UnsafeRow) 1140 / 1200 18.4 54.3 0.2X + *LongToUnsafeRowMap (opt=false) 378 / 400 55.5 18.0 0.7X + *LongToUnsafeRowMap (opt=true) 144 / 152 145.2 6.9 1.9X + *BytesToBytesMap (off Heap) 1300 / 1616 16.1 62.0 0.2X + *BytesToBytesMap (on Heap) 1165 / 1202 18.0 55.5 0.2X + *Aggregate HashMap 121 / 131 173.3 5.8 2.2X */ benchmark.run() } @@ -633,12 +656,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - collect 1 million 439 / 654 2.4 418.7 1.0X - collect 2 millions 961 / 1907 1.1 916.4 0.5X - collect 4 millions 3193 / 3895 0.3 3044.7 0.1X + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *collect 1 million 439 / 654 2.4 418.7 1.0X + *collect 2 millions 961 / 1907 1.1 916.4 0.5X + *collect 4 millions 3193 / 3895 0.3 3044.7 0.1X */ } @@ -655,11 +678,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /** - model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) - collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - collect limit 1 million 833 / 1284 1.3 794.4 1.0X - collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X + *model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) + *collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *collect limit 1 million 833 / 1284 1.3 794.4 1.0X + *collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X */ } } From 8c9e17a1d40e3014e39b1d04f3a458aa129784f8 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 12 Apr 2016 16:01:03 -0700 Subject: [PATCH 04/13] 20ns --- .../sql/execution/WholeStageCodegen.scala | 2 +- .../aggregate/TungstenAggregate.scala | 30 +++++++++---------- .../BenchmarkWholeStageCodegen.scala | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index d8a0babe8b1c4..f94db340333ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -339,7 +339,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup // try to compile, helpful for debug val cleanedSource = CodeFormatter.stripExtraNewLines(source) - // println(s"\n${CodeFormatter.format(cleanedSource)}") + println(s"\n${CodeFormatter.format(cleanedSource)}") CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index ba9edc76a0a60..e5f8643f1d517 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -644,21 +644,21 @@ case class TungstenAggregate( ${keyCode.code.trim} ${hashEval.code.trim} $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); - } - } - if ($aggregateRow == null && $buffer == null) { - if ($sorterTerm == null) { - $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); - } else { - $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); - } - $resetCounter - // the hash map had be spilled, it should have enough memory now, - // try to allocate buffer again. - $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); - if ($buffer == null) { - // failed to allocate the first page - throw new OutOfMemoryError("No enough memory for aggregation"); + if ($buffer == null) { + | if ($sorterTerm == null) { + | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); + | } else { + | $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); + | } + | $resetCounter + | // the hash map had be spilled, it should have enough memory now, + | // try to allocate buffer again. + | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); + | if ($buffer == null) { + | // failed to allocate the first page + | throw new OutOfMemoryError("No enough memory for aggregation"); + | } + | } } } $incCounter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index f63f7808148d4..50bcf5b7dd981 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -165,13 +165,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { sqlContext.setConf("spark.sql.codegen.wholeStage", "false") f() } + */ benchmark.addCase(s"codegen=true hashmap=false") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") f() } -*/ benchmark.addCase(s"codegen=true hashmap=true") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") From 3379294b76d91a55dbe86e31efb9812c8d37768c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 12 Apr 2016 16:18:36 -0700 Subject: [PATCH 05/13] generated code --- .../sql/execution/WholeStageCodegen.scala | 2 +- .../ColumnarAggMapCodeGenerator.scala | 22 +-- .../aggregate/TungstenAggregate.scala | 165 +++++++++--------- 3 files changed, 92 insertions(+), 97 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index f94db340333ad..29acc38ab3584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -339,7 +339,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup // try to compile, helpful for debug val cleanedSource = CodeFormatter.stripExtraNewLines(source) - println(s"\n${CodeFormatter.format(cleanedSource)}") + logDebug(s"\n${CodeFormatter.format(cleanedSource)}") CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala index cf27cb1eb3ec2..954caa0cdbaee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala @@ -65,25 +65,27 @@ class ColumnarAggMapCodeGenerator( .mkString("\n")}; """.stripMargin - val generatedSchema2: String = + val generatedAggBufferSchema: String = s""" |new org.apache.spark.sql.types.StructType() - |${(bufferSchema).map(key => + |${bufferSchema.map(key => s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") .mkString("\n")}; """.stripMargin s""" | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; - | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch2; + | public org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; | private int[] buckets; | private int numBuckets; | private int maxSteps; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema - | private org.apache.spark.sql.types.StructType schema2 = $generatedSchema2 + | private org.apache.spark.sql.types.StructType aggregateBufferSchema = + | $generatedAggBufferSchema | | public $generatedClassName() { + | // TODO: These should be generated based on the schema | int DEFAULT_CAPACITY = 1 << 16; | double DEFAULT_LOAD_FACTOR = 0.25; | int DEFAULT_MAX_STEPS = 2; @@ -92,10 +94,10 @@ class ColumnarAggMapCodeGenerator( | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); - | batch2 = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema2, - | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); - | for (int i = 0 ; i < batch2.numCols(); i++) { - | batch2.setColumn(i, batch.column(i+${groupingKeys.length})); + | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( + | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { + | aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length})); | } | buckets = new int[numBuckets]; | java.util.Arrays.fill(buckets, -1); @@ -193,9 +195,9 @@ class ColumnarAggMapCodeGenerator( .mkString("\n")} | buckets[idx] = numRows++; | batch.setNumRows(numRows); - | return batch2.getRow(buckets[idx]); + | return aggregateBufferBatch.getRow(buckets[idx]); | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { - | return batch2.getRow(buckets[idx]); + | return aggregateBufferBatch.getRow(buckets[idx]); | } | idx = (idx + 1) & (numBuckets - 1); | step++; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index e5f8643f1d517..82355e3342b9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -268,7 +268,6 @@ case class TungstenAggregate( // The name for HashMap private var hashMapTerm: String = _ private var sorterTerm: String = _ - private var abc: String = _ /** * This is called by generated Java class, should be public. @@ -410,15 +409,10 @@ case class TungstenAggregate( BindReferences.bindReference(e, inputAttrs).gen(ctx) } s""" - // 1 $evaluateKeyVars - // 2 $evaluateBufferVars - // 3 $evaluateAggResults - // 4 ${consume(ctx, resultVars)} - // 5 """ } else if (modes.contains(Partial) || modes.contains(PartialMerge)) { @@ -501,6 +495,33 @@ case class TungstenAggregate( // so `copyResult` should be reset to `false`. ctx.copyResult = false + def outputFromGeneratedMap: Option[String] = { + if (isAggregateHashMapEnabled) { + val row = ctx.freshName("aggregateHashMapRow") + ctx.currentVars = null + ctx.INPUT_ROW = row + var schema: StructType = groupingKeySchema + bufferSchema.foreach(i => schema = schema.add(i)) + val generateRow = GenerateUnsafeProjection.createCode(ctx, schema.toAttributes.zipWithIndex + .map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) }) + Option( + s""" + | while ($iterTermForGeneratedHashMap.hasNext()) { + | $numOutput.add(1); + | org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $row = + | (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) + | $iterTermForGeneratedHashMap.next(); + | ${generateRow.code} + | ${consume(ctx, Seq.empty, {generateRow.value})} + | + | if (shouldStop()) return; + | } + | + | $aggregateHashMapTerm.batch.close(); + """.stripMargin) + } else None + } + s""" if (!$initAgg) { $initAgg = true; @@ -508,47 +529,7 @@ case class TungstenAggregate( } // output the result - ${if (isAggregateHashMapEnabled) { - val row = ctx.freshName("aggregateHashmapRow") - var schema: StructType = groupingKeySchema - bufferSchema.foreach(i => schema = schema.add(i)) - val aggregateRow = ctx.freshName("aggregateRow") - ctx.addMutableState("UnsafeProjection", aggregateRow, "") - - ctx.currentVars = null - ctx.INPUT_ROW = row - val code = GenerateUnsafeProjection.createCode(ctx, - schema.toAttributes.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable) - }) - - UnsafeProjection.create(schema) - val x = ctx.freshName("x") - // ctx.INPUT_ROW = row - // ctx.currentVars = null -/* - val eval = resultExpressions.map { e => - BindReferences.bindReference(e, groupingAttributes).gen(ctx) - } -*/ - s""" - while ($iterTermForGeneratedHashMap.hasNext()) { - $numOutput.add(1); - org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $row = - (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) - $iterTermForGeneratedHashMap.next(); - ${code.code} - // UnsafeRow $x = aggregateRow.apply($row); - ${consume(ctx, Seq.empty, {code.value})} - // append(row.copy()); - - if (shouldStop()) return; - } - - $aggregateHashMapTerm.batch.close(); - """ - } else "" - } + ${outputFromGeneratedMap.getOrElse("")} while ($iterTerm.next()) { $numOutput.add(1); @@ -595,25 +576,19 @@ case class TungstenAggregate( val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - ctx.INPUT_ROW = aggregateRow - var schema: StructType = groupingKeySchema - bufferSchema.foreach(i => schema = schema.add(i)) - // ctx.currentVars = null - val code = GenerateUnsafeProjection.createCode(ctx, - schema.toAttributes.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable) - }) + ctx.INPUT_ROW = aggregateRow // TODO: support subexpression elimination val aggregateRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) val updateAggregateRow = aggregateRowEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType ctx.updateColumn(aggregateRow, dt, i, ev, updateExpr(i).nullable) } + ctx.INPUT_ROW = buffer // TODO: support subexpression elimination - val evals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) - val updateAggregateBuffer = evals.zipWithIndex.map { case (ev, i) => + val aggregateBufferEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) + val updateAggregateBuffer = aggregateBufferEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType ctx.updateColumn(buffer, dt, i, ev, updateExpr(i).nullable) } @@ -626,6 +601,47 @@ case class TungstenAggregate( ("true", "", "") } + val findOrInsertInGeneratedHashMap: Option[String] = { + if (isAggregateHashMapEnabled) { + Option( + s""" + | $aggregateRow = + | $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")}); + """.stripMargin) + } else { + None + } + } + + val findOrInsertInBytesToBytesMap: String = { + s""" + | if ($aggregateRow == null) { + | // generate grouping key + | ${keyCode.code.trim} + | ${hashEval.code.trim} + | if ($checkFallback) { + | // try to get the buffer from hash map + | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); + | } + | if ($buffer == null) { + | if ($sorterTerm == null) { + | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); + | } else { + | $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); + | } + | $resetCounter + | // the hash map had be spilled, it should have enough memory now, + | // try to allocate buffer again. + | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); + | if ($buffer == null) { + | // failed to allocate the first page + | throw new OutOfMemoryError("No enough memory for aggregation"); + | } + | } + | } + """.stripMargin + } + // We try to do hash map based in-memory aggregation first. If there is not enough memory (the // hash map will return null for new key), we spill the hash map to disk to free memory, then // continue to do in-memory aggregation and spilling until all the rows had been processed. @@ -633,34 +649,11 @@ case class TungstenAggregate( s""" UnsafeRow $buffer = null; org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $aggregateRow = null; - if ($checkFallback) { - ${if (isAggregateHashMapEnabled) { - s"""$aggregateRow = - $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")});""" - } else ""} - // try to get the buffer from hash map - if ($aggregateRow == null) { - // generate grouping key - ${keyCode.code.trim} - ${hashEval.code.trim} - $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); - if ($buffer == null) { - | if ($sorterTerm == null) { - | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); - | } else { - | $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); - | } - | $resetCounter - | // the hash map had be spilled, it should have enough memory now, - | // try to allocate buffer again. - | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); - | if ($buffer == null) { - | // failed to allocate the first page - | throw new OutOfMemoryError("No enough memory for aggregation"); - | } - | } - } - } + + ${findOrInsertInGeneratedHashMap.getOrElse("")} + + $findOrInsertInBytesToBytesMap + $incCounter if ($aggregateRow != null) { @@ -670,7 +663,7 @@ case class TungstenAggregate( ${updateAggregateRow.mkString("\n").trim} } else { // evaluate aggregate function - ${evaluateVariables(evals)} + ${evaluateVariables(aggregateBufferEvals)} // update aggregate buffer ${updateAggregateBuffer.mkString("\n").trim} } From 4ee56873764d62efdaf8c47cb74aa399f2194fde Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 12 Apr 2016 18:23:27 -0700 Subject: [PATCH 06/13] benchmark --- .../BenchmarkWholeStageCodegen.scala | 240 +++++++++--------- 1 file changed, 118 insertions(+), 122 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 50bcf5b7dd981..195b3e287581d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -124,56 +124,50 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /** - * Using ImperativeAggregate (as implemented in Spark 1.6): - ** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate - *------------------------------------------------------------------------------- - *stddev w/o codegen 2019.04 10.39 1.00 X - *stddev w codegen 2097.29 10.00 0.96 X - *kurtosis w/o codegen 2108.99 9.94 0.96 X - *kurtosis w codegen 2090.69 10.03 0.97 X - ** - *Using DeclarativeAggregate: - ** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *stddev codegen=false 5630 / 5776 18.0 55.6 1.0X - *stddev codegen=true 1259 / 1314 83.0 12.0 4.5X - ** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X - *kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X + Using ImperativeAggregate (as implemented in Spark 1.6): + + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate + ------------------------------------------------------------------------------- + stddev w/o codegen 2019.04 10.39 1.00 X + stddev w codegen 2097.29 10.00 0.96 X + kurtosis w/o codegen 2108.99 9.94 0.96 X + kurtosis w codegen 2090.69 10.03 0.97 X + + Using DeclarativeAggregate: + + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + stddev codegen=false 5630 / 5776 18.0 55.6 1.0X + stddev codegen=true 1259 / 1314 83.0 12.0 4.5X + + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X + kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X */ } - test("aggregate with keys") { - val N = 20 << 20 + ignore("aggregate with keys") { + val N = 20 << 20 val benchmark = new Benchmark("Aggregate w keys", N) - // def f(): Unit = sqlContext.range(N).selectExpr("id", "(id & 65535) as k1", "(id & 65535) as k2").groupBy("k1", "k2").sum("id").collect() - // .foreach(println) - def f(): Unit = sqlContext.range(N).selectExpr("id", "(id & 65535) as k1").groupBy("k1").sum("id").collect() - - //def f(): Unit = sqlContext.range(N).selectExpr("id").groupBy().sum("id").collect() + def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() -/* - benchmark.addCase(s"codegen=false") { iter => + benchmark.addCase(s"codegen = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "false") f() } - */ - benchmark.addCase(s"codegen=true hashmap=false") { iter => + benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") f() } - benchmark.addCase(s"codegen=true hashmap=true") { iter => + benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") f() @@ -182,11 +176,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Aggregate w keys codegen=false 2429 / 2644 8.6 115.8 1.0X - Aggregate w keys codegen=true 1535 / 1571 13.7 73.2 1.6X + Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + codegen = F 2124 / 2204 9.9 101.3 1.0X + codegen = T hashmap = F 1198 / 1364 17.5 57.1 1.8X + codegen = T hashmap = T 369 / 600 56.8 17.6 5.8X */ } @@ -214,12 +210,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X - *Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X + Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X */ val dim2 = broadcast(sqlContext.range(M) @@ -232,12 +228,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X - *Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X + Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X */ val dim3 = broadcast(sqlContext.range(M) @@ -250,12 +246,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X - *Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X + Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X */ val dim4 = broadcast(sqlContext.range(M) @@ -268,11 +264,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X - *Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X + Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X */ runBenchmark("outer join w long", N) { @@ -280,12 +276,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X - *outer join w long codegen=true 261 / 276 80.5 12.4 11.7X + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X + outer join w long codegen=true 261 / 276 80.5 12.4 11.7X */ runBenchmark("semi join w long", N) { @@ -293,12 +289,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X - *semi join w long codegen=true 237 / 244 88.3 11.3 8.1X + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X + semi join w long codegen=true 237 / 244 88.3 11.3 8.1X */ } @@ -311,11 +307,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *merge join codegen=false 1588 / 1880 1.3 757.1 1.0X - *merge join codegen=true 1477 / 1531 1.4 704.2 1.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + merge join codegen=false 1588 / 1880 1.3 757.1 1.0X + merge join codegen=true 1477 / 1531 1.4 704.2 1.1X */ runBenchmark("sort merge join", N) { @@ -327,11 +323,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X - *sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X + sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X */ } @@ -347,12 +343,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X - *shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X + shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X */ } @@ -365,11 +361,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *cube codegen=false 3188 / 3392 1.6 608.2 1.0X - *cube codegen=true 1239 / 1394 4.2 236.3 2.6X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + cube codegen=false 3188 / 3392 1.6 608.2 1.0X + cube codegen=true 1239 / 1394 4.2 236.3 2.6X */ } @@ -621,21 +617,21 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } /** - *Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - *BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *UnsafeRow hash 267 / 284 78.4 12.8 1.0X - *murmur3 hash 102 / 129 205.5 4.9 2.6X - *fast hash 79 / 96 263.8 3.8 3.4X - *arrayEqual 164 / 172 128.2 7.8 1.6X - *Java HashMap (Long) 321 / 399 65.4 15.3 0.8X - *Java HashMap (two ints) 328 / 363 63.9 15.7 0.8X - *Java HashMap (UnsafeRow) 1140 / 1200 18.4 54.3 0.2X - *LongToUnsafeRowMap (opt=false) 378 / 400 55.5 18.0 0.7X - *LongToUnsafeRowMap (opt=true) 144 / 152 145.2 6.9 1.9X - *BytesToBytesMap (off Heap) 1300 / 1616 16.1 62.0 0.2X - *BytesToBytesMap (on Heap) 1165 / 1202 18.0 55.5 0.2X - *Aggregate HashMap 121 / 131 173.3 5.8 2.2X + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + UnsafeRow hash 267 / 284 78.4 12.8 1.0X + murmur3 hash 102 / 129 205.5 4.9 2.6X + fast hash 79 / 96 263.8 3.8 3.4X + arrayEqual 164 / 172 128.2 7.8 1.6X + Java HashMap (Long) 321 / 399 65.4 15.3 0.8X + Java HashMap (two ints) 328 / 363 63.9 15.7 0.8X + Java HashMap (UnsafeRow) 1140 / 1200 18.4 54.3 0.2X + LongToUnsafeRowMap (opt=false) 378 / 400 55.5 18.0 0.7X + LongToUnsafeRowMap (opt=true) 144 / 152 145.2 6.9 1.9X + BytesToBytesMap (off Heap) 1300 / 1616 16.1 62.0 0.2X + BytesToBytesMap (on Heap) 1165 / 1202 18.0 55.5 0.2X + Aggregate HashMap 121 / 131 173.3 5.8 2.2X */ benchmark.run() } @@ -656,12 +652,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /** - *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - *collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *collect 1 million 439 / 654 2.4 418.7 1.0X - *collect 2 millions 961 / 1907 1.1 916.4 0.5X - *collect 4 millions 3193 / 3895 0.3 3044.7 0.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + collect 1 million 439 / 654 2.4 418.7 1.0X + collect 2 millions 961 / 1907 1.1 916.4 0.5X + collect 4 millions 3193 / 3895 0.3 3044.7 0.1X */ } @@ -678,11 +674,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /** - *model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) - *collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - *------------------------------------------------------------------------------------------- - *collect limit 1 million 833 / 1284 1.3 794.4 1.0X - *collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X + model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) + collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + collect limit 1 million 833 / 1284 1.3 794.4 1.0X + collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X */ } } From c2fc38584dd073036a1f04f7cd7da9fcf50739e8 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 12 Apr 2016 18:58:01 -0700 Subject: [PATCH 07/13] fix comment --- .../execution/aggregate/ColumnarAggMapCodeGenerator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala index 954caa0cdbaee..38e46dfc8cf99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.types.StructType /** - * This is a helper object to generate an append-only single-key/single value aggregate hash - * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates - * (and fall back to the `BytesToBytesMap` if a given key isn't found). This is 'codegened' in - * TungstenAggregate to speed up aggregates w/ key. + * This is a helper class to generate an append-only aggregate hash map that can act as a 'cache' + * for extremely fast key-value lookups while evaluating aggregates (and fall back to the + * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in TungstenAggregate to speed + * up aggregates w/ key. * * It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the * key-value pairs. The index lookups in the array rely on linear probing (with a small number of From fc6b8cb337e11d3d92f0f13828b8a0b85e30929c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 13 Apr 2016 11:48:56 -0700 Subject: [PATCH 08/13] enable conf by default for testing --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 343b95b6a9a44..6cc8331d6946b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -441,7 +441,7 @@ object SQLConf { .internal() .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" From ececd5770e0c5410cf3da463f3b52db467d1f5ca Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 13 Apr 2016 15:12:16 -0700 Subject: [PATCH 09/13] fix tests --- .../ColumnarAggMapCodeGenerator.scala | 3 +- .../aggregate/TungstenAggregate.scala | 29 ++++++++---- .../TungstenAggregationIterator.scala | 8 ++-- .../execution/AggregationQuerySuite.scala | 47 ++++++++++--------- 4 files changed, 51 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala index 38e46dfc8cf99..0df9cb964dad6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala @@ -191,10 +191,11 @@ class ColumnarAggMapCodeGenerator( | ${groupingKeys.zipWithIndex.map(k => s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")} | ${bufferValues.zipWithIndex.map(k => - s"batch.column(${groupingKeys.length + k._2}).putLong(numRows, 0);") + s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);") .mkString("\n")} | buckets[idx] = numRows++; | batch.setNumRows(numRows); + | aggregateBufferBatch.setNumRows(numRows); | return aggregateBufferBatch.getRow(buckets[idx]); | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { | return aggregateBufferBatch.getRow(buckets[idx]); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 82355e3342b9f..5d7ccbf1fdc59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -70,12 +70,14 @@ case class TungstenAggregate( } } - // This is for testing. We force TungstenAggregationIterator to fall back to sort-based - // aggregation once it has processed a given number of input rows. - private val testFallbackStartsAt: Option[Int] = { + // This is for testing. We force TungstenAggregationIterator to fall back to the bytes to bytes + // map and the sort-based aggregation once it has processed a given number of input rows. + private val testFallbackStartsAt: Option[(Int, Int)] = { sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match { case null | "" => None - case fallbackStartsAt => Some(fallbackStartsAt.toInt) + case fallbackStartsAt => + val splits = fallbackStartsAt.split(",").map(_.trim) + Some((splits.head.toInt, splits.last.toInt)) } } @@ -593,20 +595,27 @@ case class TungstenAggregate( ctx.updateColumn(buffer, dt, i, ev, updateExpr(i).nullable) } - val (checkFallback, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { + val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, + incCounter) = if (testFallbackStartsAt.isDefined) { val countTerm = ctx.freshName("fallbackCounter") ctx.addMutableState("int", countTerm, s"$countTerm = 0;") - (s"$countTerm < ${testFallbackStartsAt.get}", s"$countTerm = 0;", s"$countTerm += 1;") + (s"$countTerm < ${testFallbackStartsAt.get._1}", + s"$countTerm < ${testFallbackStartsAt.get._2}", s"$countTerm = 0;", s"$countTerm += 1;") } else { - ("true", "", "") + ("true", "true", "", "") } val findOrInsertInGeneratedHashMap: Option[String] = { if (isAggregateHashMapEnabled) { Option( s""" - | $aggregateRow = - | $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")}); + |if ($checkFallbackForGeneratedHashMap) { + | ${groupByKeys.map(_.code).mkString("\n")} + | if (${groupByKeys.map("!" + _.isNull).mkString(" && ")}) { + | $aggregateRow = + | $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")}); + | } + |} """.stripMargin) } else { None @@ -619,7 +628,7 @@ case class TungstenAggregate( | // generate grouping key | ${keyCode.code.trim} | ${hashEval.code.trim} - | if ($checkFallback) { + | if ($checkFallbackForBytesToBytesMap) { | // try to get the buffer from hash map | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); | } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index ce504e20e6dd3..09384a482d9fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -85,7 +85,7 @@ class TungstenAggregationIterator( newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection), originalInputAttributes: Seq[Attribute], inputIter: Iterator[InternalRow], - testFallbackStartsAt: Option[Int], + testFallbackStartsAt: Option[(Int, Int)], numOutputRows: LongSQLMetric, dataSize: LongSQLMetric, spillSize: LongSQLMetric) @@ -171,7 +171,7 @@ class TungstenAggregationIterator( // hashMap. If there is not enough memory, it will multiple hash-maps, spilling // after each becomes full then using sort to merge these spills, finally do sort // based aggregation. - private def processInputs(fallbackStartsAt: Int): Unit = { + private def processInputs(fallbackStartsAt: (Int, Int)): Unit = { if (groupingExpressions.isEmpty) { // If there is no grouping expressions, we can just reuse the same buffer over and over again. // Note that it would be better to eliminate the hash map entirely in the future. @@ -187,7 +187,7 @@ class TungstenAggregationIterator( val newInput = inputIter.next() val groupingKey = groupingProjection.apply(newInput) var buffer: UnsafeRow = null - if (i < fallbackStartsAt) { + if (i < fallbackStartsAt._2) { buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) } if (buffer == null) { @@ -352,7 +352,7 @@ class TungstenAggregationIterator( /** * Start processing input rows. */ - processInputs(testFallbackStartsAt.getOrElse(Int.MaxValue)) + processInputs(testFallbackStartsAt.getOrElse((Int.MaxValue, Int.MaxValue))) // If we did not switch to sort-based aggregation in processInputs, // we pre-load the first key-value pair from the map (to make hasNext idempotent). diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 94fbcb7ee2056..84bb7edf03821 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -967,27 +967,32 @@ class TungstenAggregationQuerySuite extends AggregationQuerySuite class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite { override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { - (0 to 2).foreach { fallbackStartsAt => - withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> fallbackStartsAt.toString) { - // Create a new df to make sure its physical operator picks up - // spark.sql.TungstenAggregate.testFallbackStartsAt. - // todo: remove it? - val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan) - - QueryTest.checkAnswer(newActual, expectedAnswer) match { - case Some(errorMessage) => - val newErrorMessage = - s""" - |The following aggregation query failed when using TungstenAggregate with - |controlled fallback (it falls back to sort-based aggregation once it has processed - |$fallbackStartsAt input rows). The query is - |${actual.queryExecution} - | - |$errorMessage - """.stripMargin - - fail(newErrorMessage) - case None => + Seq(false, true).foreach { enableColumnarHashMap => + withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> enableColumnarHashMap.toString) { + (1 to 3).foreach { fallbackStartsAt => + withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> + s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") { + // Create a new df to make sure its physical operator picks up + // spark.sql.TungstenAggregate.testFallbackStartsAt. + // todo: remove it? + val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan) + + QueryTest.checkAnswer(newActual, expectedAnswer) match { + case Some(errorMessage) => + val newErrorMessage = + s""" + |The following aggregation query failed when using TungstenAggregate with + |controlled fallback (it falls back to bytes to bytes map once it has processed + |${fallbackStartsAt -1} input rows and to sort-based aggregation once it has + |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution} + | + |$errorMessage + """.stripMargin + + fail(newErrorMessage) + case None => // Success + } + } } } } From 0ca0db17130bb6a1e59cdbc699f5abd946821d44 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 13 Apr 2016 15:31:24 -0700 Subject: [PATCH 10/13] review comments --- .../ColumnarAggMapCodeGenerator.scala | 25 +++++++++++++++++-- .../aggregate/TungstenAggregate.scala | 4 +-- .../BenchmarkWholeStageCodegen.scala | 6 ++--- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala index 0df9cb964dad6..c315446cf62fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala @@ -52,6 +52,10 @@ class ColumnarAggMapCodeGenerator( |${generateEquals()} | |${generateHashFunction()} + | + |${generateRowIterator()} + | + |${generateClose()} |} """.stripMargin } @@ -74,8 +78,8 @@ class ColumnarAggMapCodeGenerator( """.stripMargin s""" - | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; - | public org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; + | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; | private int[] buckets; | private int numBuckets; | private int maxSteps; @@ -208,4 +212,21 @@ class ColumnarAggMapCodeGenerator( |} """.stripMargin } + + private def generateRowIterator(): String = { + s""" + |public java.util.Iterator + | rowIterator() { + | return batch.rowIterator(); + |} + """.stripMargin + } + + private def generateClose(): String = { + s""" + |public void close() { + | batch.close(); + |} + """.stripMargin + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 5d7ccbf1fdc59..253af3045edb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -481,7 +481,7 @@ case class TungstenAggregate( ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} ${if (isAggregateHashMapEnabled) { - s"$iterTermForGeneratedHashMap = $aggregateHashMapTerm.batch.rowIterator();"} else ""} + s"$iterTermForGeneratedHashMap = $aggregateHashMapTerm.rowIterator();"} else ""} $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm); } @@ -519,7 +519,7 @@ case class TungstenAggregate( | if (shouldStop()) return; | } | - | $aggregateHashMapTerm.batch.close(); + | $aggregateHashMapTerm.close(); """.stripMargin) } else None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 195b3e287581d..d23f19c480633 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -180,9 +180,9 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - codegen = F 2124 / 2204 9.9 101.3 1.0X - codegen = T hashmap = F 1198 / 1364 17.5 57.1 1.8X - codegen = T hashmap = T 369 / 600 56.8 17.6 5.8X + codegen = F 2219 / 2392 9.4 105.8 1.0X + codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X + codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X */ } From 041c001957999de8d383a10c0b7126c62f98ad9b Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 13 Apr 2016 22:32:10 -0700 Subject: [PATCH 11/13] CR --- .../aggregate/TungstenAggregate.scala | 165 ++++++++++-------- ...scala => VectorizedHashMapGenerator.scala} | 15 +- 2 files changed, 105 insertions(+), 75 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/{ColumnarAggMapCodeGenerator.scala => VectorizedHashMapGenerator.scala} (94%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 253af3045edb4..c4341b883fc37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -70,8 +70,8 @@ case class TungstenAggregate( } } - // This is for testing. We force TungstenAggregationIterator to fall back to the bytes to bytes - // map and the sort-based aggregation once it has processed a given number of input rows. + // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash + // map and/or the sort-based aggregation once it has processed a given number of input rows. private val testFallbackStartsAt: Option[(Int, Int)] = { sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match { case null | "" => None @@ -263,11 +263,11 @@ case class TungstenAggregate( .map(_.asInstanceOf[DeclarativeAggregate]) private val bufferSchema = StructType.fromAttributes(aggregateBufferAttributes) - // The name for AggregateHashMap - private var aggregateHashMapTerm: String = _ - private var isAggregateHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled + // The name for Vectorized HashMap + private var vectorizedHashMapTerm: String = _ + private var isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled - // The name for HashMap + // The name for UnsafeRow HashMap private var hashMapTerm: String = _ private var sorterTerm: String = _ @@ -443,21 +443,21 @@ case class TungstenAggregate( val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") - // We currently only enable aggregate hashmap for long key/value types - isAggregateHashMapEnabled = isAggregateHashMapEnabled && + // We currently only enable vectorized hashmap for long key/value types + isVectorizedHashMapEnabled = isVectorizedHashMapEnabled && (groupingKeySchema ++ bufferSchema).forall(_.dataType == LongType) - aggregateHashMapTerm = ctx.freshName("aggregateHashMap") - val aggregateHashMapClassName = ctx.freshName("GeneratedAggregateHashMap") - val aggregateHashMapGenerator = new ColumnarAggMapCodeGenerator(ctx, aggregateHashMapClassName, + vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap") + val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap") + val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, vectorizedHashMapClassName, groupingKeySchema, bufferSchema) - // Create a name for iterator from AggregateHashMap - val iterTermForGeneratedHashMap = ctx.freshName("genMapIter") - if (isAggregateHashMapEnabled) { - ctx.addMutableState(aggregateHashMapClassName, aggregateHashMapTerm, - s"$aggregateHashMapTerm = new $aggregateHashMapClassName();") + // Create a name for iterator from vectorized HashMap + val iterTermForVectorizedHashMap = ctx.freshName("vectorizedHashMapIter") + if (isVectorizedHashMapEnabled) { + ctx.addMutableState(vectorizedHashMapClassName, vectorizedHashMapTerm, + s"$vectorizedHashMapTerm = new $vectorizedHashMapClassName();") ctx.addMutableState( "java.util.Iterator", - iterTermForGeneratedHashMap, "") + iterTermForVectorizedHashMap, "") } // create hashMap @@ -475,13 +475,13 @@ case class TungstenAggregate( val doAgg = ctx.freshName("doAggregateWithKeys") ctx.addNewFunction(doAgg, s""" - ${if (isAggregateHashMapEnabled) aggregateHashMapGenerator.generate() else ""} + ${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""} private void $doAgg() throws java.io.IOException { $hashMapTerm = $thisPlan.createHashMap(); ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} - ${if (isAggregateHashMapEnabled) { - s"$iterTermForGeneratedHashMap = $aggregateHashMapTerm.rowIterator();"} else ""} + ${if (isVectorizedHashMapEnabled) { + s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""} $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm); } @@ -497,9 +497,10 @@ case class TungstenAggregate( // so `copyResult` should be reset to `false`. ctx.copyResult = false + // Iterate over the aggregate rows and convert them from ColumnarBatch.Row to UnsafeRow def outputFromGeneratedMap: Option[String] = { - if (isAggregateHashMapEnabled) { - val row = ctx.freshName("aggregateHashMapRow") + if (isVectorizedHashMapEnabled) { + val row = ctx.freshName("vectorizedHashMapRow") ctx.currentVars = null ctx.INPUT_ROW = row var schema: StructType = groupingKeySchema @@ -508,18 +509,18 @@ case class TungstenAggregate( .map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) }) Option( s""" - | while ($iterTermForGeneratedHashMap.hasNext()) { + | while ($iterTermForVectorizedHashMap.hasNext()) { | $numOutput.add(1); | org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $row = | (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) - | $iterTermForGeneratedHashMap.next(); + | $iterTermForVectorizedHashMap.next(); | ${generateRow.code} | ${consume(ctx, Seq.empty, {generateRow.value})} | | if (shouldStop()) return; | } | - | $aggregateHashMapTerm.close(); + | $vectorizedHashMapTerm.close(); """.stripMargin) } else None } @@ -553,13 +554,13 @@ case class TungstenAggregate( // create grouping key ctx.currentVars = input - val keyCode = GenerateUnsafeProjection.createCode( + val unsafeRowKeyCode = GenerateUnsafeProjection.createCode( ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output))) - val groupByKeys = ctx.generateExpressions( + val vectorizedRowKeys = ctx.generateExpressions( groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output))) - val key = keyCode.value - val buffer = ctx.freshName("aggBuffer") - val aggregateRow = ctx.freshName("aggregateRow") + val unsafeRowKeys = unsafeRowKeyCode.value + val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer") + val vectorizedRowBuffer = ctx.freshName("vectorizedAggBuffer") // only have DeclarativeAggregate val updateExpr = aggregateExpressions.flatMap { e => @@ -579,22 +580,6 @@ case class TungstenAggregate( val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - ctx.INPUT_ROW = aggregateRow - // TODO: support subexpression elimination - val aggregateRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) - val updateAggregateRow = aggregateRowEvals.zipWithIndex.map { case (ev, i) => - val dt = updateExpr(i).dataType - ctx.updateColumn(aggregateRow, dt, i, ev, updateExpr(i).nullable) - } - - ctx.INPUT_ROW = buffer - // TODO: support subexpression elimination - val aggregateBufferEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) - val updateAggregateBuffer = aggregateBufferEvals.zipWithIndex.map { case (ev, i) => - val dt = updateExpr(i).dataType - ctx.updateColumn(buffer, dt, i, ev, updateExpr(i).nullable) - } - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { val countTerm = ctx.freshName("fallbackCounter") @@ -605,15 +590,17 @@ case class TungstenAggregate( ("true", "true", "", "") } - val findOrInsertInGeneratedHashMap: Option[String] = { - if (isAggregateHashMapEnabled) { + // We first generate code to probe and update the vectorized hash map. If the probe is + // successful the corresponding vectorized row buffer will hold the mutable row + val findOrInsertInVectorizedHashMap: Option[String] = { + if (isVectorizedHashMapEnabled) { Option( s""" |if ($checkFallbackForGeneratedHashMap) { - | ${groupByKeys.map(_.code).mkString("\n")} - | if (${groupByKeys.map("!" + _.isNull).mkString(" && ")}) { - | $aggregateRow = - | $aggregateHashMapTerm.findOrInsert(${groupByKeys.map(_.value).mkString(", ")}); + | ${vectorizedRowKeys.map(_.code).mkString("\n")} + | if (${vectorizedRowKeys.map("!" + _.isNull).mkString(" && ")}) { + | $vectorizedRowBuffer = $vectorizedHashMapTerm.findOrInsert( + | ${vectorizedRowKeys.map(_.value).mkString(", ")}); | } |} """.stripMargin) @@ -622,17 +609,37 @@ case class TungstenAggregate( } } - val findOrInsertInBytesToBytesMap: String = { + val updateRowInVectorizedHashMap: Option[String] = { + if (isVectorizedHashMapEnabled) { + ctx.INPUT_ROW = vectorizedRowBuffer + val vectorizedRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) + val updateVectorizedRow = vectorizedRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + ctx.updateColumn(vectorizedRowBuffer, dt, i, ev, updateExpr(i).nullable) + } + Option( + s""" + |// evaluate aggregate function + |${evaluateVariables(vectorizedRowEvals)} + |// update vectorized row + |${updateVectorizedRow.mkString("\n").trim} + """.stripMargin) + } else None + } + + // Next, we generate code to probe and update the unsafe row hash map. + val findOrInsertInUnsafeRowMap: String = { s""" - | if ($aggregateRow == null) { + | if ($vectorizedRowBuffer == null) { | // generate grouping key - | ${keyCode.code.trim} + | ${unsafeRowKeyCode.code.trim} | ${hashEval.code.trim} | if ($checkFallbackForBytesToBytesMap) { | // try to get the buffer from hash map - | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); + | $unsafeRowBuffer = + | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); | } - | if ($buffer == null) { + | if ($unsafeRowBuffer == null) { | if ($sorterTerm == null) { | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); | } else { @@ -641,8 +648,9 @@ case class TungstenAggregate( | $resetCounter | // the hash map had be spilled, it should have enough memory now, | // try to allocate buffer again. - | $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value}); - | if ($buffer == null) { + | $unsafeRowBuffer = + | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); + | if ($unsafeRowBuffer == null) { | // failed to allocate the first page | throw new OutOfMemoryError("No enough memory for aggregation"); | } @@ -651,30 +659,43 @@ case class TungstenAggregate( """.stripMargin } + val updateRowInUnsafeRowMap: String = { + ctx.INPUT_ROW = unsafeRowBuffer + val unsafeRowBufferEvals = + updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) + val updateUnsafeRowBuffer = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + ctx.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) + } + s""" + |// evaluate aggregate function + |${evaluateVariables(unsafeRowBufferEvals)} + |// update unsafe row buffer + |${updateUnsafeRowBuffer.mkString("\n").trim} + """.stripMargin + } + + // We try to do hash map based in-memory aggregation first. If there is not enough memory (the // hash map will return null for new key), we spill the hash map to disk to free memory, then // continue to do in-memory aggregation and spilling until all the rows had been processed. // Finally, sort the spilled aggregate buffers by key, and merge them together for same key. s""" - UnsafeRow $buffer = null; - org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $aggregateRow = null; + UnsafeRow $unsafeRowBuffer = null; + org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $vectorizedRowBuffer = null; - ${findOrInsertInGeneratedHashMap.getOrElse("")} + ${findOrInsertInVectorizedHashMap.getOrElse("")} - $findOrInsertInBytesToBytesMap + $findOrInsertInUnsafeRowMap $incCounter - if ($aggregateRow != null) { - // evaluate aggregate function - ${evaluateVariables(aggregateRowEvals)} - // update aggregate row - ${updateAggregateRow.mkString("\n").trim} + if ($vectorizedRowBuffer != null) { + // update vectorized row + ${updateRowInVectorizedHashMap.getOrElse("")} } else { - // evaluate aggregate function - ${evaluateVariables(aggregateBufferEvals)} - // update aggregate buffer - ${updateAggregateBuffer.mkString("\n").trim} + // update unsafe row + $updateRowInUnsafeRowMap } """ } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala similarity index 94% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index c315446cf62fc..395cc7ab91709 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.types.StructType /** - * This is a helper class to generate an append-only aggregate hash map that can act as a 'cache' + * This is a helper class to generate an append-only vectorized hash map that can act as a 'cache' * for extremely fast key-value lookups while evaluating aggregates (and fall back to the * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in TungstenAggregate to speed * up aggregates w/ key. @@ -31,9 +31,14 @@ import org.apache.spark.sql.types.StructType * maximum tries) and use an inexpensive hash function which makes it really efficient for a * majority of lookups. However, using linear probing and an inexpensive hash function also makes it * less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even - * for certain distribution of keys) and requires us to fall back on the latter for correctness. + * for certain distribution of keys) and requires us to fall back on the latter for correctness. We + * also use a secondary columnar batch that logically projects over the original columnar batch and + * is equivalent to the `BytesToBytesMap` aggregate buffer. + * + * NOTE: This vectorized hash map currently doesn't support nullable keys and falls back to the + * `BytesToBytesMap` to store them. */ -class ColumnarAggMapCodeGenerator( +class VectorizedHashMapGenerator( ctx: CodegenContext, generatedClassName: String, groupingKeySchema: StructType, @@ -96,13 +101,17 @@ class ColumnarAggMapCodeGenerator( | assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0)); | this.maxSteps = DEFAULT_MAX_STEPS; | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); + | | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + | + | // TODO: Possibly generate this projection in TungstenAggregate directly | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { | aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length})); | } + | | buckets = new int[numBuckets]; | java.util.Arrays.fill(buckets, -1); | } From ec66a5404d07da359763e4e8f8af909d045d3eb1 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 14 Apr 2016 16:54:57 -0700 Subject: [PATCH 12/13] use only for partial aggregation --- .../spark/sql/execution/aggregate/TungstenAggregate.scala | 3 ++- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index c4341b883fc37..55b81170718ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -265,7 +265,8 @@ case class TungstenAggregate( // The name for Vectorized HashMap private var vectorizedHashMapTerm: String = _ - private var isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled + private var isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled && + (modes.contains(Partial) || modes.contains(PartialMerge)) // The name for UnsafeRow HashMap private var hashMapTerm: String = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6cc8331d6946b..995b60db3a8dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -437,11 +437,12 @@ object SQLConf { .stringConf .createOptional + // TODO: This is still WIP and shouldn't be turned on without extensive test coverage val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled") .internal() .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" From 9b5ee1d9d648ae454cc0749047528509a032c7ab Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 14 Apr 2016 18:21:52 -0700 Subject: [PATCH 13/13] CR --- .../sql/execution/aggregate/TungstenAggregate.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 55b81170718ff..f585759e583c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -265,8 +265,11 @@ case class TungstenAggregate( // The name for Vectorized HashMap private var vectorizedHashMapTerm: String = _ - private var isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled && - (modes.contains(Partial) || modes.contains(PartialMerge)) + + // We currently only enable vectorized hashmap for long key/value types and partial aggregates + private val isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled && + (groupingKeySchema ++ bufferSchema).forall(_.dataType == LongType) && + modes.forall(mode => mode == Partial || mode == PartialMerge) // The name for UnsafeRow HashMap private var hashMapTerm: String = _ @@ -444,9 +447,6 @@ case class TungstenAggregate( val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") - // We currently only enable vectorized hashmap for long key/value types - isVectorizedHashMapEnabled = isVectorizedHashMapEnabled && - (groupingKeySchema ++ bufferSchema).forall(_.dataType == LongType) vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap") val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap") val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, vectorizedHashMapClassName,