|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | + |
| 18 | +package org.apache.spark.sql.hive.execution |
| 19 | + |
| 20 | +import scala.collection.JavaConversions._ |
| 21 | + |
| 22 | +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} |
| 23 | +import org.apache.hadoop.hive.conf.HiveConf |
| 24 | +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} |
| 25 | +import org.apache.hadoop.hive.serde.serdeConstants |
| 26 | +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils |
| 27 | +import org.apache.hadoop.hive.serde2.objectinspector._ |
| 28 | +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption |
| 29 | +import org.apache.hadoop.hive.serde2.objectinspector.primitive._ |
| 30 | +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils |
| 31 | + |
| 32 | +import org.apache.spark.annotation.DeveloperApi |
| 33 | +import org.apache.spark.sql.catalyst.expressions._ |
| 34 | +import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} |
| 35 | +import org.apache.spark.sql.execution._ |
| 36 | +import org.apache.spark.sql.hive._ |
| 37 | +import org.apache.spark.util.MutablePair |
| 38 | + |
| 39 | +/** |
| 40 | + * :: DeveloperApi :: |
| 41 | + * The Hive table scan operator. Column and partition pruning are both handled. |
| 42 | + * |
| 43 | + * @param attributes Attributes to be fetched from the Hive table. |
| 44 | + * @param relation The Hive table be be scanned. |
| 45 | + * @param partitionPruningPred An optional partition pruning predicate for partitioned table. |
| 46 | + */ |
| 47 | +@DeveloperApi |
| 48 | +case class HiveTableScan( |
| 49 | + attributes: Seq[Attribute], |
| 50 | + relation: MetastoreRelation, |
| 51 | + partitionPruningPred: Option[Expression])( |
| 52 | + @transient val context: HiveContext) |
| 53 | + extends LeafNode |
| 54 | + with HiveInspectors { |
| 55 | + |
| 56 | + require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, |
| 57 | + "Partition pruning predicates only supported for partitioned tables.") |
| 58 | + |
| 59 | + // Bind all partition key attribute references in the partition pruning predicate for later |
| 60 | + // evaluation. |
| 61 | + private[this] val boundPruningPred = partitionPruningPred.map { pred => |
| 62 | + require( |
| 63 | + pred.dataType == BooleanType, |
| 64 | + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") |
| 65 | + |
| 66 | + BindReferences.bindReference(pred, relation.partitionKeys) |
| 67 | + } |
| 68 | + |
| 69 | + @transient |
| 70 | + private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context) |
| 71 | + |
| 72 | + /** |
| 73 | + * The hive object inspector for this table, which can be used to extract values from the |
| 74 | + * serialized row representation. |
| 75 | + */ |
| 76 | + @transient |
| 77 | + private[this] lazy val objectInspector = |
| 78 | + relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] |
| 79 | + |
| 80 | + /** |
| 81 | + * Functions that extract the requested attributes from the hive output. Partitioned values are |
| 82 | + * casted from string to its declared data type. |
| 83 | + */ |
| 84 | + @transient |
| 85 | + protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { |
| 86 | + attributes.map { a => |
| 87 | + val ordinal = relation.partitionKeys.indexOf(a) |
| 88 | + if (ordinal >= 0) { |
| 89 | + val dataType = relation.partitionKeys(ordinal).dataType |
| 90 | + (_: Any, partitionKeys: Array[String]) => { |
| 91 | + castFromString(partitionKeys(ordinal), dataType) |
| 92 | + } |
| 93 | + } else { |
| 94 | + val ref = objectInspector.getAllStructFieldRefs |
| 95 | + .find(_.getFieldName == a.name) |
| 96 | + .getOrElse(sys.error(s"Can't find attribute $a")) |
| 97 | + val fieldObjectInspector = ref.getFieldObjectInspector |
| 98 | + |
| 99 | + val unwrapHiveData = fieldObjectInspector match { |
| 100 | + case _: HiveVarcharObjectInspector => |
| 101 | + (value: Any) => value.asInstanceOf[HiveVarchar].getValue |
| 102 | + case _: HiveDecimalObjectInspector => |
| 103 | + (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue()) |
| 104 | + case _ => |
| 105 | + identity[Any] _ |
| 106 | + } |
| 107 | + |
| 108 | + (row: Any, _: Array[String]) => { |
| 109 | + val data = objectInspector.getStructFieldData(row, ref) |
| 110 | + val hiveData = unwrapData(data, fieldObjectInspector) |
| 111 | + if (hiveData != null) unwrapHiveData(hiveData) else null |
| 112 | + } |
| 113 | + } |
| 114 | + } |
| 115 | + } |
| 116 | + |
| 117 | + private[this] def castFromString(value: String, dataType: DataType) = { |
| 118 | + Cast(Literal(value), dataType).eval(null) |
| 119 | + } |
| 120 | + |
| 121 | + private def addColumnMetadataToConf(hiveConf: HiveConf) { |
| 122 | + // Specifies IDs and internal names of columns to be scanned. |
| 123 | + val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) |
| 124 | + val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") |
| 125 | + |
| 126 | + if (attributes.size == relation.output.size) { |
| 127 | + ColumnProjectionUtils.setFullyReadColumns(hiveConf) |
| 128 | + } else { |
| 129 | + ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) |
| 130 | + } |
| 131 | + |
| 132 | + ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) |
| 133 | + |
| 134 | + // Specifies types and object inspectors of columns to be scanned. |
| 135 | + val structOI = ObjectInspectorUtils |
| 136 | + .getStandardObjectInspector( |
| 137 | + relation.tableDesc.getDeserializer.getObjectInspector, |
| 138 | + ObjectInspectorCopyOption.JAVA) |
| 139 | + .asInstanceOf[StructObjectInspector] |
| 140 | + |
| 141 | + val columnTypeNames = structOI |
| 142 | + .getAllStructFieldRefs |
| 143 | + .map(_.getFieldObjectInspector) |
| 144 | + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) |
| 145 | + .mkString(",") |
| 146 | + |
| 147 | + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) |
| 148 | + hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) |
| 149 | + } |
| 150 | + |
| 151 | + addColumnMetadataToConf(context.hiveconf) |
| 152 | + |
| 153 | + private def inputRdd = if (!relation.hiveQlTable.isPartitioned) { |
| 154 | + hadoopReader.makeRDDForTable(relation.hiveQlTable) |
| 155 | + } else { |
| 156 | + hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) |
| 157 | + } |
| 158 | + |
| 159 | + /** |
| 160 | + * Prunes partitions not involve the query plan. |
| 161 | + * |
| 162 | + * @param partitions All partitions of the relation. |
| 163 | + * @return Partitions that are involved in the query plan. |
| 164 | + */ |
| 165 | + private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { |
| 166 | + boundPruningPred match { |
| 167 | + case None => partitions |
| 168 | + case Some(shouldKeep) => partitions.filter { part => |
| 169 | + val dataTypes = relation.partitionKeys.map(_.dataType) |
| 170 | + val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { |
| 171 | + castFromString(value, dataType) |
| 172 | + } |
| 173 | + |
| 174 | + // Only partitioned values are needed here, since the predicate has already been bound to |
| 175 | + // partition key attribute references. |
| 176 | + val row = new GenericRow(castedValues.toArray) |
| 177 | + shouldKeep.eval(row).asInstanceOf[Boolean] |
| 178 | + } |
| 179 | + } |
| 180 | + } |
| 181 | + |
| 182 | + override def execute() = { |
| 183 | + inputRdd.mapPartitions { iterator => |
| 184 | + if (iterator.isEmpty) { |
| 185 | + Iterator.empty |
| 186 | + } else { |
| 187 | + val mutableRow = new GenericMutableRow(attributes.length) |
| 188 | + val mutablePair = new MutablePair[Any, Array[String]]() |
| 189 | + val buffered = iterator.buffered |
| 190 | + |
| 191 | + // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern |
| 192 | + // matching are avoided intentionally. |
| 193 | + val rowsAndPartitionKeys = buffered.head match { |
| 194 | + // With partition keys |
| 195 | + case _: Array[Any] => |
| 196 | + buffered.map { case array: Array[Any] => |
| 197 | + val deserializedRow = array(0) |
| 198 | + val partitionKeys = array(1).asInstanceOf[Array[String]] |
| 199 | + mutablePair.update(deserializedRow, partitionKeys) |
| 200 | + } |
| 201 | + |
| 202 | + // Without partition keys |
| 203 | + case _ => |
| 204 | + val emptyPartitionKeys = Array.empty[String] |
| 205 | + buffered.map { deserializedRow => |
| 206 | + mutablePair.update(deserializedRow, emptyPartitionKeys) |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + rowsAndPartitionKeys.map { pair => |
| 211 | + var i = 0 |
| 212 | + while (i < attributes.length) { |
| 213 | + mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) |
| 214 | + i += 1 |
| 215 | + } |
| 216 | + mutableRow: Row |
| 217 | + } |
| 218 | + } |
| 219 | + } |
| 220 | + } |
| 221 | + |
| 222 | + override def output = attributes |
| 223 | +} |
0 commit comments