@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
4040import org .apache .spark .sql .execution ._
4141import org .apache .spark .sql .hive ._
4242import org .apache .spark .{TaskContext , SparkException }
43+ import org .apache .spark .util .MutablePair
4344
4445/* Implicits */
4546import scala .collection .JavaConversions ._
@@ -190,27 +191,34 @@ case class HiveTableScan(
190191 Iterator .empty
191192 } else {
192193 val mutableRow = new GenericMutableRow (attributes.length)
194+ val mutablePair = new MutablePair [Any , Array [String ]]()
193195 val buffered = iterator.buffered
196+
197+ // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
198+ // matching are avoided intentionally.
194199 val rowsAndPartitionKeys = buffered.head match {
195- case Array (_, _) =>
196- buffered.map { case Array (deserializedRow, partitionKeys : Array [String ]) =>
197- (deserializedRow, partitionKeys)
200+ // With partition keys
201+ case _ : Array [Any ] =>
202+ buffered.map { case array : Array [Any ] =>
203+ val deserializedRow = array(0 )
204+ val partitionKeys = array(1 ).asInstanceOf [Array [String ]]
205+ mutablePair.update(deserializedRow, partitionKeys)
198206 }
199207
208+ // Without partition keys
200209 case _ =>
201- buffered.map {
202- (_, Array .empty[String ])
210+ val emptyPartitionKeys = Array .empty[String ]
211+ buffered.map { deserializedRow =>
212+ mutablePair.update(deserializedRow, emptyPartitionKeys)
203213 }
204214 }
205215
206- rowsAndPartitionKeys.map { case (deserializedRow, partitionKeys) =>
216+ rowsAndPartitionKeys.map { pair =>
207217 var i = 0
208-
209218 while (i < attributes.length) {
210- mutableRow(i) = attributeFunctions(i)(deserializedRow, partitionKeys )
219+ mutableRow(i) = attributeFunctions(i)(pair._1, pair._2 )
211220 i += 1
212221 }
213-
214222 mutableRow : Row
215223 }
216224 }
0 commit comments