From eb62fd357741602cad09a3413db6981d3a3091e4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 11 May 2014 14:41:42 +0800 Subject: [PATCH 1/5] [SPARK-1368] Optimized HiveTableScan --- .../apache/spark/sql/hive/hiveOperators.scala | 51 +++++++++++++------ 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 96faebc5a8687..aadd7f531cdc2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -94,7 +94,7 @@ case class HiveTableScan( (_: Any, partitionKeys: Array[String]) => { val value = partitionKeys(ordinal) val dataType = relation.partitionKeys(ordinal).dataType - castFromString(value, dataType) + unwrapHiveData(castFromString(value, dataType)) } } else { val ref = objectInspector.getAllStructFieldRefs @@ -102,12 +102,19 @@ case class HiveTableScan( .getOrElse(sys.error(s"Can't find attribute $a")) (row: Any, _: Array[String]) => { val data = objectInspector.getStructFieldData(row, ref) - unwrapData(data, ref.getFieldObjectInspector) + unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector)) } } } } + private def unwrapHiveData(value: Any) = value match { + case maybeNull: String if maybeNull.toLowerCase == "null" => null + case varchar: HiveVarchar => varchar.getValue + case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue) + case other => other + } + private def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } @@ -143,20 +150,34 @@ case class HiveTableScan( } def execute() = { - inputRdd.map { row => - val values = row match { - case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) => - attributeFunctions.map(_(deserializedRow, partitionKeys)) - case deserializedRow: AnyRef => - attributeFunctions.map(_(deserializedRow, Array.empty)) + inputRdd.mapPartitions { iterator => + if (iterator.isEmpty) { + Iterator.empty + } else { + val mutableRow = new GenericMutableRow(attributes.length) + val buffered = iterator.buffered + + (buffered.head match { + case Array(_, _) => + buffered.map { case Array(deserializedRow, partitionKeys: Array[String]) => + (deserializedRow, partitionKeys) + } + + case _ => + buffered.map { deserializedRow => + (deserializedRow, Array.empty[String]) + } + }).map { case (deserializedRow, partitionKeys: Array[String]) => + var i = 0 + + while (i < attributes.length) { + mutableRow(i) = attributeFunctions(i)(deserializedRow, partitionKeys) + i += 1 + } + + mutableRow: Row + } } - buildRow(values.map { - case n: String if n.toLowerCase == "null" => null - case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue - case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => - BigDecimal(decimal.bigDecimalValue) - case other => other - }) } } From 6d1c64278de00cfe5cce51483e2cabc844b3f74d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 14 May 2014 00:33:06 +0800 Subject: [PATCH 2/5] Using ColumnProjectionUtils to optimise RCFile and ORC column pruning --- .../apache/spark/sql/hive/hiveOperators.scala | 48 ++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index aadd7f531cdc2..30a5c067cfc76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -18,15 +18,18 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} -import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ @@ -119,6 +122,38 @@ case class HiveTableScan( Cast(Literal(value), dataType).eval(null) } + private def addColumnMetadataToConf(hiveConf: HiveConf) { + // Specifies IDs and internal names of columns to be scanned. + val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) + val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") + + if (attributes.size == relation.output.size) { + ColumnProjectionUtils.setFullyReadColumns(hiveConf) + } else { + ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) + } + + ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + relation.tableDesc.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) + } + + addColumnMetadataToConf(sc.hiveconf) + @transient def inputRdd = if (!relation.hiveQlTable.isPartitioned) { hadoopReader.makeRDDForTable(relation.hiveQlTable) @@ -156,18 +191,19 @@ case class HiveTableScan( } else { val mutableRow = new GenericMutableRow(attributes.length) val buffered = iterator.buffered - - (buffered.head match { + val rowsAndPartitionKeys = buffered.head match { case Array(_, _) => buffered.map { case Array(deserializedRow, partitionKeys: Array[String]) => (deserializedRow, partitionKeys) } case _ => - buffered.map { deserializedRow => - (deserializedRow, Array.empty[String]) + buffered.map { + (_, Array.empty[String]) } - }).map { case (deserializedRow, partitionKeys: Array[String]) => + } + + rowsAndPartitionKeys.map { case (deserializedRow, partitionKeys) => var i = 0 while (i < attributes.length) { From bf0e7dce3879174d35c48ca2e30c4f55bd827f8a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 15 May 2014 12:08:44 +0800 Subject: [PATCH 3/5] Added SortedOperation pattern to match *some* definitely sorted operations and avoid some sorting cost in HiveComparisonTest. --- .../spark/sql/catalyst/planning/patterns.scala | 13 +++++++++++++ .../apache/spark/sql/execution/Aggregate.scala | 2 +- .../sql/hive/execution/HiveComparisonTest.scala | 17 ++++++----------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 4544b32958c7e..6a1d83ad2f1af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -168,3 +168,16 @@ object Unions { case other => other :: Nil } } + +/** + * A pattern that matches (some) sorted operations and returns corresponding sorting orders. + * Currently operations matched by this pattern are guaranteed to be sorted, but not all sorted + * operations are matched by this pattern. + */ +object SortedOperation { + // TODO (lian) detect more sorted operations + def unapply(plan: LogicalPlan): Option[Seq[SortOrder]] = plan match { + case FilteredOperation(_, Sort(order, _)) => Some(order) + case _ => None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 36b3b956da96c..604914e547790 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -116,7 +116,7 @@ case class Aggregate( */ @transient private[this] lazy val resultMap = - (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap + (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap /** * Substituted version of aggregateExpressions expressions which are used to compute final diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index edff38b901073..43939d6cf0c3b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.execution import java.io._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} + import org.apache.spark.sql.Logging +import org.apache.spark.sql.catalyst.planning.SortedOperation import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.Sort -import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.sql.hive.test.TestHive /** @@ -131,14 +132,8 @@ abstract class HiveComparisonTest val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") - case _: ExplainCommand => answer - case _ => - // TODO: Really we only care about the final total ordering here... - val isOrdered = hiveQuery.executedPlan.collect { - case s @ Sort(_, global, _) if global => s - }.nonEmpty - // If the query results aren't sorted, then sort them to ensure deterministic answers. - if (!isOrdered) answer.sorted else answer + case _: ExplainCommand | SortedOperation(_) => answer + case _ => answer.sorted } orderedAnswer.map(cleanPaths) } @@ -161,7 +156,7 @@ abstract class HiveComparisonTest "minFileSize" ) protected def nonDeterministicLine(line: String) = - nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_) + nonDeterministicLineIndicators.exists(line contains _) /** * Removes non-deterministic paths from `str` so cached answers will compare correctly. From cf640d8e7d41dc6f30c075c5e06f3f66a22c1344 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 15 May 2014 12:09:53 +0800 Subject: [PATCH 4/5] More HiveTableScan optimisations: - Using mutable pairs - Avoiding pattern matching (Array.unapply function calls) --- .../apache/spark/sql/hive/hiveOperators.scala | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 30a5c067cfc76..f141139ef46a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ import org.apache.spark.{TaskContext, SparkException} +import org.apache.spark.util.MutablePair /* Implicits */ import scala.collection.JavaConversions._ @@ -190,27 +191,34 @@ case class HiveTableScan( Iterator.empty } else { val mutableRow = new GenericMutableRow(attributes.length) + val mutablePair = new MutablePair[Any, Array[String]]() val buffered = iterator.buffered + + // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern + // matching are avoided intentionally. val rowsAndPartitionKeys = buffered.head match { - case Array(_, _) => - buffered.map { case Array(deserializedRow, partitionKeys: Array[String]) => - (deserializedRow, partitionKeys) + // With partition keys + case _: Array[Any] => + buffered.map { case array: Array[Any] => + val deserializedRow = array(0) + val partitionKeys = array(1).asInstanceOf[Array[String]] + mutablePair.update(deserializedRow, partitionKeys) } + // Without partition keys case _ => - buffered.map { - (_, Array.empty[String]) + val emptyPartitionKeys = Array.empty[String] + buffered.map { deserializedRow => + mutablePair.update(deserializedRow, emptyPartitionKeys) } } - rowsAndPartitionKeys.map { case (deserializedRow, partitionKeys) => + rowsAndPartitionKeys.map { pair => var i = 0 - while (i < attributes.length) { - mutableRow(i) = attributeFunctions(i)(deserializedRow, partitionKeys) + mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) i += 1 } - mutableRow: Row } } From 4241a19bcb529ca3e2cd3f4ea9e2ab3aa9f93f6d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 18 May 2014 20:22:43 +0800 Subject: [PATCH 5/5] Distinguishes sorted and possibly not sorted operations more accurately in HiveComparisonTest --- .../spark/sql/catalyst/planning/patterns.scala | 13 ------------- .../sql/hive/execution/HiveComparisonTest.scala | 14 +++++++++++--- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 6a1d83ad2f1af..4544b32958c7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -168,16 +168,3 @@ object Unions { case other => other :: Nil } } - -/** - * A pattern that matches (some) sorted operations and returns corresponding sorting orders. - * Currently operations matched by this pattern are guaranteed to be sorted, but not all sorted - * operations are matched by this pattern. - */ -object SortedOperation { - // TODO (lian) detect more sorted operations - def unapply(plan: LogicalPlan): Option[Seq[SortOrder]] = plan match { - case FilteredOperation(_, Sort(order, _)) => Some(order) - case _ => None - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 43939d6cf0c3b..1b5a132f9665d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -22,8 +22,8 @@ import java.io._ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.sql.Logging -import org.apache.spark.sql.catalyst.planning.SortedOperation -import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive.test.TestHive @@ -129,10 +129,18 @@ abstract class HiveComparisonTest protected def prepareAnswer( hiveQuery: TestHive.type#HiveQLQueryExecution, answer: Seq[String]): Seq[String] = { + + def isSorted(plan: LogicalPlan): Boolean = plan match { + case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false + case PhysicalOperation(_, _, Sort(_, _)) => true + case _ => plan.children.iterator.map(isSorted).exists(_ == true) + } + val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") - case _: ExplainCommand | SortedOperation(_) => answer + case _: ExplainCommand => answer + case plan if isSorted(plan) => answer case _ => answer.sorted } orderedAnswer.map(cleanPaths)