From bd6894171c537627c09d04451ca1725dfc8228ae Mon Sep 17 00:00:00 2001 From: Emad Soroush Date: Sat, 19 Apr 2014 14:11:08 -0700 Subject: [PATCH 1/8] test code commit --- .../apache/spark/sql/hive/hiveOperators.scala | 153 ++++++++++++++---- 1 file changed, 119 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 821fb22112f87..f799b068929bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ @@ -30,14 +31,18 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharOb import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ +import org.apache.spark._ +import org.apache.spark.rdd.ParallelCollectionRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException} - /* Implicits */ import scala.collection.JavaConversions._ +import scala.collection.mutable +/* java library */ +import java.util.ArrayList /** * The Hive table scan operator. Column and partition pruning are both handled. @@ -69,8 +74,15 @@ case class HiveTableScan( } @transient - val hadoopReader = new HadoopTableReader(relation.tableDesc, sc) - + val hadoopReader = { + val cNames: ArrayList[Integer] = getNeededColumnIDs() + if (cNames.size() != 0) { + ColumnProjectionUtils.appendReadColumnIDs(sc.hiveconf,cNames) + } else { + ColumnProjectionUtils.setFullyReadColumns(sc.hiveconf) + } + new HadoopTableReader(relation.tableDesc, sc) + } /** * The hive object inspector for this table, which can be used to extract values from the * serialized row representation. @@ -79,30 +91,55 @@ case class HiveTableScan( lazy val objectInspector = relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] + /** attempt to retrieve a list of column ids that is required ... used by ColumnProjectionUtils */ + private def getNeededColumnIDs() : ArrayList[Integer] = { + val names: ArrayList[Integer] = new ArrayList[Integer]() + var i = 0 + val len = relation.attributes.length + while(i < len) { + for(a <- attributes) { + if(a.name == relation.attributes(i).name) { + names.add(i) + + } + } + i += 1 + } + names + } /** * Functions that extract the requested attributes from the hive output. Partitioned values are * casted from string to its declared data type. */ @transient protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { - attributes.map { a => + val len = attributes.length + /** newList ArrayBuffer + while loop created to simulate functionality of attributes.map(...) .... performance reason */ + val newList = mutable.ArrayBuffer[(Any, Array[String]) => Any]() + var i = 0 + while (i < len) { + val a: Attribute = attributes(i) val ordinal = relation.partitionKeys.indexOf(a) if (ordinal >= 0) { - (_: Any, partitionKeys: Array[String]) => { - val value = partitionKeys(ordinal) - val dataType = relation.partitionKeys(ordinal).dataType - castFromString(value, dataType) - } + newList += ( (_: Any, partitionKeys: Array[String]) => { + val value = partitionKeys(ordinal) + val dataType = relation.partitionKeys(ordinal).dataType + castFromString(value, dataType) + }) } else { val ref = objectInspector.getAllStructFieldRefs .find(_.getFieldName == a.name) .getOrElse(sys.error(s"Can't find attribute $a")) - (row: Any, _: Array[String]) => { - val data = objectInspector.getStructFieldData(row, ref) - unwrapData(data, ref.getFieldObjectInspector) - } + newList += ( (row: Any, _: Array[String]) => { + val data = objectInspector.getStructFieldData(row, ref) + unwrapData(data, ref.getFieldObjectInspector) + }) } + i+=1 } + + newList + } private def castFromString(value: String, dataType: DataType) = { @@ -123,40 +160,87 @@ case class HiveTableScan( * @return Partitions that are involved in the query plan. */ private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { + + /** mutable row implementation to avoid creating row instance at each iteration inside the while loop. */ + val row = new GenericMutableRow(attributes.length) + boundPruningPred match { case None => partitions case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { - castFromString(value, dataType) - } + val castedValues = mutable.ArrayBuffer[Any]() + var i = 0 + var len = relation.partitionKeys.length + val iter: Iterator[String] = part.getValues.iterator + while (i < len) { + castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType) + i += 1 + } // Only partitioned values are needed here, since the predicate has already been bound to // partition key attribute references. - val row = new GenericRow(castedValues.toArray) - shouldKeep.eval(row).asInstanceOf[Boolean] + //val row = new GenericRow(castedValues.toArray) + i = 0 + len = castedValues.length + while (i < len) { + /** castedValues represents columns in the row */ + castedValues(i) match { + case n: String if n.toLowerCase == "null" => row.setNullAt(i) + case n: Boolean => row.setBoolean(i,n) + case n: Byte => row.setByte(i,n) + case n: Double => row.setDouble(i,n) + case n: Float => row.setFloat(i,n) + case n: Int => row.setInt(i,n) + case n: Long => row.setLong(i,n) + case n: String => row.setString(i,n) + case n: Short => row.setShort(i,n) + case other => row.update(i,other) + } + i += 1 + } + shouldKeep.eval(row).asInstanceOf[Boolean] } } } def execute() = { - inputRdd.map { row => - val values = row match { - case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) => - attributeFunctions.map(_(deserializedRow, partitionKeys)) - case deserializedRow: AnyRef => - attributeFunctions.map(_(deserializedRow, Array.empty)) + /** + * mutableRow is GenericMutableRow type and only created once. + * mutableRow is upadted at each iteration inside the while loop. + */ + val mutableRow = new GenericMutableRow(attributes.length) + var i = 0 + + var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => { + /** rddBuffer keeps track of all the transformed rows ... needed later to create finalRdd */ + val rddBuffer = mutable.ArrayBuffer[Row]() + while (iter.hasNext) { + val row = iter.next() + val values = row match { + case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) => + attributeFunctions.map(_(deserializedRow, partitionKeys)) + case deserializedRow: AnyRef => + attributeFunctions.map(_(deserializedRow, Array.empty)) + } + i = 0 + val len = values.length + while ( i < len ) { + values(i) match { + case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i) + case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => mutableRow.update(i,varchar.getValue) + case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => + mutableRow.update(i,BigDecimal(decimal.bigDecimalValue)) + case other => mutableRow.update(i,other) + } + i += 1 + } + rddBuffer += mutableRow } - buildRow(values.map { - case n: String if n.toLowerCase == "null" => null - case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue - case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => - BigDecimal(decimal.bigDecimalValue) - case other => other - }) - } + rddBuffer + } ) + /** finalRdd ... equivelant to Rdd generated from inputRdd.map(...) */ + val finalRdd = inputRdd.context.makeRDD(res(0)) + finalRdd } - def output = attributes } @@ -174,6 +258,7 @@ case class InsertIntoHiveTable( private def newSerializer(tableDesc: TableDesc): Serializer = { val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) serializer } From ddc1c2398deb56fdc08a36cc032c329ccdedc73b Mon Sep 17 00:00:00 2001 From: esoroush Date: Sat, 19 Apr 2014 14:30:52 -0700 Subject: [PATCH 2/8] code task 2 --- .../main/scala/org/apache/spark/sql/hive/hiveOperators.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index f799b068929bf..53f0059b46776 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -31,8 +31,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharOb import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ -import org.apache.spark._ -import org.apache.spark.rdd.ParallelCollectionRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} @@ -258,7 +256,6 @@ case class InsertIntoHiveTable( private def newSerializer(tableDesc: TableDesc): Serializer = { val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) serializer } From e492069d08ea10a9cfc994684aee79cbd5e420e2 Mon Sep 17 00:00:00 2001 From: esoroush Date: Mon, 21 Apr 2014 12:42:09 -0700 Subject: [PATCH 3/8] tab characters correction --- .../apache/spark/sql/hive/hiveOperators.scala | 57 +++++++++---------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 53f0059b46776..5017c3bb9c26b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -95,13 +95,12 @@ case class HiveTableScan( var i = 0 val len = relation.attributes.length while(i < len) { - for(a <- attributes) { - if(a.name == relation.attributes(i).name) { - names.add(i) - - } - } - i += 1 + for(a <- attributes) { + if(a.name == relation.attributes(i).name) { + names.add(i) + } + } + i += 1 } names } @@ -135,9 +134,7 @@ case class HiveTableScan( } i+=1 } - newList - } private def castFromString(value: String, dataType: DataType) = { @@ -166,36 +163,36 @@ case class HiveTableScan( case None => partitions case Some(shouldKeep) => partitions.filter { part => val castedValues = mutable.ArrayBuffer[Any]() - var i = 0 + var i = 0 var len = relation.partitionKeys.length val iter: Iterator[String] = part.getValues.iterator - while (i < len) { - castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType) + while (i < len) { + castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType) i += 1 - } + } // Only partitioned values are needed here, since the predicate has already been bound to // partition key attribute references. //val row = new GenericRow(castedValues.toArray) i = 0 len = castedValues.length - while (i < len) { - /** castedValues represents columns in the row */ - castedValues(i) match { - case n: String if n.toLowerCase == "null" => row.setNullAt(i) - case n: Boolean => row.setBoolean(i,n) + while (i < len) { + /** castedValues represents columns in the row */ + castedValues(i) match { + case n: String if n.toLowerCase == "null" => row.setNullAt(i) + case n: Boolean => row.setBoolean(i,n) case n: Byte => row.setByte(i,n) case n: Double => row.setDouble(i,n) case n: Float => row.setFloat(i,n) case n: Int => row.setInt(i,n) - case n: Long => row.setLong(i,n) - case n: String => row.setString(i,n) - case n: Short => row.setShort(i,n) - case other => row.update(i,other) - } - i += 1 - } - shouldKeep.eval(row).asInstanceOf[Boolean] + case n: Long => row.setLong(i,n) + case n: String => row.setString(i,n) + case n: Short => row.setShort(i,n) + case other => row.update(i,other) + } + i += 1 + } + shouldKeep.eval(row).asInstanceOf[Boolean] } } } @@ -224,17 +221,17 @@ case class HiveTableScan( while ( i < len ) { values(i) match { case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i) - case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => mutableRow.update(i,varchar.getValue) + case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => mutableRow.update(i,varchar.getValue) case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => mutableRow.update(i,BigDecimal(decimal.bigDecimalValue)) - case other => mutableRow.update(i,other) - } + case other => mutableRow.update(i,other) + } i += 1 } rddBuffer += mutableRow } rddBuffer - } ) + }) /** finalRdd ... equivelant to Rdd generated from inputRdd.map(...) */ val finalRdd = inputRdd.context.makeRDD(res(0)) finalRdd From ccb66bad478b0b284ac90da69d2ee633e0397b7a Mon Sep 17 00:00:00 2001 From: esoroush Date: Mon, 21 Apr 2014 13:13:27 -0700 Subject: [PATCH 4/8] tab characters correction --- .../apache/spark/sql/hive/hiveOperators.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 5017c3bb9c26b..b125c103cbe00 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -97,7 +97,7 @@ case class HiveTableScan( while(i < len) { for(a <- attributes) { if(a.name == relation.attributes(i).name) { - names.add(i) + names.add(i) } } i += 1 @@ -111,7 +111,9 @@ case class HiveTableScan( @transient protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { val len = attributes.length - /** newList ArrayBuffer + while loop created to simulate functionality of attributes.map(...) .... performance reason */ + /** newList ArrayBuffer + while loop created to simulate functionality of + * attributes.map(...) .... performance reason. + */ val newList = mutable.ArrayBuffer[(Any, Array[String]) => Any]() var i = 0 while (i < len) { @@ -122,7 +124,8 @@ case class HiveTableScan( val value = partitionKeys(ordinal) val dataType = relation.partitionKeys(ordinal).dataType castFromString(value, dataType) - }) + } + ) } else { val ref = objectInspector.getAllStructFieldRefs .find(_.getFieldName == a.name) @@ -130,7 +133,8 @@ case class HiveTableScan( newList += ( (row: Any, _: Array[String]) => { val data = objectInspector.getStructFieldData(row, ref) unwrapData(data, ref.getFieldObjectInspector) - }) + } + ) } i+=1 } @@ -155,8 +159,9 @@ case class HiveTableScan( * @return Partitions that are involved in the query plan. */ private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { - - /** mutable row implementation to avoid creating row instance at each iteration inside the while loop. */ + /** mutable row implementation to avoid creating row instance at + * each iteration inside the while loop. + */ val row = new GenericMutableRow(attributes.length) boundPruningPred match { @@ -171,8 +176,8 @@ case class HiveTableScan( i += 1 } - // Only partitioned values are needed here, since the predicate has already been bound to - // partition key attribute references. + //Only partitioned values are needed here, since the predicate has already been bound to + //partition key attribute references. //val row = new GenericRow(castedValues.toArray) i = 0 len = castedValues.length @@ -206,7 +211,9 @@ case class HiveTableScan( var i = 0 var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => { - /** rddBuffer keeps track of all the transformed rows ... needed later to create finalRdd */ + /** rddBuffer keeps track of all the transformed rows. + * needed later to create finalRdd + */ val rddBuffer = mutable.ArrayBuffer[Row]() while (iter.hasNext) { val row = iter.next() From c3b6d12084c41ddb01839dbb3fd1defbec6849cf Mon Sep 17 00:00:00 2001 From: esoroush Date: Mon, 21 Apr 2014 14:28:36 -0700 Subject: [PATCH 5/8] tab characters correction-3 --- .../apache/spark/sql/hive/hiveOperators.scala | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index b125c103cbe00..cbb397ec84ace 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -163,33 +163,30 @@ case class HiveTableScan( * each iteration inside the while loop. */ val row = new GenericMutableRow(attributes.length) - boundPruningPred match { case None => partitions case Some(shouldKeep) => partitions.filter { part => val castedValues = mutable.ArrayBuffer[Any]() var i = 0 - var len = relation.partitionKeys.length + var len = relation.partitionKeys.length val iter: Iterator[String] = part.getValues.iterator while (i < len) { castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType) i += 1 } - - //Only partitioned values are needed here, since the predicate has already been bound to - //partition key attribute references. - //val row = new GenericRow(castedValues.toArray) + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. i = 0 len = castedValues.length - while (i < len) { - /** castedValues represents columns in the row */ + while (i < len) { + //castedValues represents columns in the row */ castedValues(i) match { case n: String if n.toLowerCase == "null" => row.setNullAt(i) case n: Boolean => row.setBoolean(i,n) - case n: Byte => row.setByte(i,n) - case n: Double => row.setDouble(i,n) - case n: Float => row.setFloat(i,n) - case n: Int => row.setInt(i,n) + case n: Byte => row.setByte(i,n) + case n: Double => row.setDouble(i,n) + case n: Float => row.setFloat(i,n) + case n: Int => row.setInt(i,n) case n: Long => row.setLong(i,n) case n: String => row.setString(i,n) case n: Short => row.setShort(i,n) @@ -228,7 +225,8 @@ case class HiveTableScan( while ( i < len ) { values(i) match { case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i) - case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => mutableRow.update(i,varchar.getValue) + case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => + mutableRow.update(i,varchar.getValue) case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => mutableRow.update(i,BigDecimal(decimal.bigDecimalValue)) case other => mutableRow.update(i,other) @@ -239,9 +237,9 @@ case class HiveTableScan( } rddBuffer }) - /** finalRdd ... equivelant to Rdd generated from inputRdd.map(...) */ - val finalRdd = inputRdd.context.makeRDD(res(0)) - finalRdd + /** finalRdd ... equivelant to Rdd generated from inputRdd.map(...) */ + val finalRdd = inputRdd.context.makeRDD(res(0)) + finalRdd } def output = attributes } From bb74abd765825a107d52faf6bd1cc6196f33d094 Mon Sep 17 00:00:00 2001 From: esoroush Date: Mon, 21 Apr 2014 14:32:37 -0700 Subject: [PATCH 6/8] tab characters correction-4 --- .../main/scala/org/apache/spark/sql/hive/hiveOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index cbb397ec84ace..525a6ab8e2981 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -178,8 +178,8 @@ case class HiveTableScan( // partition key attribute references. i = 0 len = castedValues.length + // castedValues represents columns in the row. while (i < len) { - //castedValues represents columns in the row */ castedValues(i) match { case n: String if n.toLowerCase == "null" => row.setNullAt(i) case n: Boolean => row.setBoolean(i,n) From 440a3e63292bc25e3a6531fc38dab1bfd1745cd5 Mon Sep 17 00:00:00 2001 From: esoroush Date: Fri, 25 Apr 2014 14:01:22 -0700 Subject: [PATCH 7/8] more refinement ... now the code passes the unit tests --- .../apache/spark/sql/hive/hiveOperators.scala | 176 ++++++++++-------- 1 file changed, 94 insertions(+), 82 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 525a6ab8e2981..613ee24eb4ef5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.metadata.Partition import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} import org.apache.hadoop.hive.serde2.ColumnProjectionUtils @@ -39,9 +40,11 @@ import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException} /* Implicits */ import scala.collection.JavaConversions._ import scala.collection.mutable +import scala.util.control._ /* java library */ import java.util.ArrayList + /** * The Hive table scan operator. Column and partition pruning are both handled. * @@ -72,6 +75,9 @@ case class HiveTableScan( } @transient + val hadoopReader = new HadoopTableReader(relation.tableDesc, sc) + /* TODO:attempt to retrieve a list of column ids that is required + // require more work ... val hadoopReader = { val cNames: ArrayList[Integer] = getNeededColumnIDs() if (cNames.size() != 0) { @@ -80,7 +86,8 @@ case class HiveTableScan( ColumnProjectionUtils.setFullyReadColumns(sc.hiveconf) } new HadoopTableReader(relation.tableDesc, sc) - } + }*/ + /** * The hive object inspector for this table, which can be used to extract values from the * serialized row representation. @@ -90,56 +97,46 @@ case class HiveTableScan( relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] /** attempt to retrieve a list of column ids that is required ... used by ColumnProjectionUtils */ - private def getNeededColumnIDs() : ArrayList[Integer] = { + private def getNeededColumnIDs() : ArrayList[Integer] = { val names: ArrayList[Integer] = new ArrayList[Integer]() var i = 0 val len = relation.attributes.length while(i < len) { - for(a <- attributes) { + for(a <- attributes) { if(a.name == relation.attributes(i).name) { names.add(i) } } - i += 1 - } + i += 1 + } names - } + } /** * Functions that extract the requested attributes from the hive output. Partitioned values are * casted from string to its declared data type. */ @transient protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { - val len = attributes.length - /** newList ArrayBuffer + while loop created to simulate functionality of - * attributes.map(...) .... performance reason. - */ - val newList = mutable.ArrayBuffer[(Any, Array[String]) => Any]() - var i = 0 - while (i < len) { - val a: Attribute = attributes(i) + attributes.map { a => val ordinal = relation.partitionKeys.indexOf(a) if (ordinal >= 0) { - newList += ( (_: Any, partitionKeys: Array[String]) => { - val value = partitionKeys(ordinal) - val dataType = relation.partitionKeys(ordinal).dataType - castFromString(value, dataType) - } - ) + (_: Any, partitionKeys: Array[String]) => { + val value = partitionKeys(ordinal) + val dataType = relation.partitionKeys(ordinal).dataType + castFromString(value, dataType) + } } else { val ref = objectInspector.getAllStructFieldRefs .find(_.getFieldName == a.name) .getOrElse(sys.error(s"Can't find attribute $a")) - newList += ( (row: Any, _: Array[String]) => { - val data = objectInspector.getStructFieldData(row, ref) - unwrapData(data, ref.getFieldObjectInspector) - } - ) + (row: Any, _: Array[String]) => { + val data = objectInspector.getStructFieldData(row, ref) + unwrapData(data, ref.getFieldObjectInspector) + } } - i+=1 } - newList } + private def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) @@ -162,13 +159,19 @@ case class HiveTableScan( /** mutable row implementation to avoid creating row instance at * each iteration inside the while loop. */ - val row = new GenericMutableRow(attributes.length) - boundPruningPred match { - case None => partitions - case Some(shouldKeep) => partitions.filter { part => - val castedValues = mutable.ArrayBuffer[Any]() + var row = new GenericMutableRow(relation.partitionKeys.length) + if (boundPruningPred == None) { + partitions + } else { + val shouldKeep:Expression = boundPruningPred.get + val partitionSize = partitions.length + var index = 0 + var filterPartition = mutable.ListBuffer[HivePartition]() + while (index < partitionSize) { + val part = partitions(index) var i = 0 var len = relation.partitionKeys.length + var castedValues = mutable.ListBuffer[Any]() val iter: Iterator[String] = part.getValues.iterator while (i < len) { castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType) @@ -180,66 +183,75 @@ case class HiveTableScan( len = castedValues.length // castedValues represents columns in the row. while (i < len) { - castedValues(i) match { - case n: String if n.toLowerCase == "null" => row.setNullAt(i) - case n: Boolean => row.setBoolean(i,n) - case n: Byte => row.setByte(i,n) - case n: Double => row.setDouble(i,n) - case n: Float => row.setFloat(i,n) - case n: Int => row.setInt(i,n) - case n: Long => row.setLong(i,n) - case n: String => row.setString(i,n) - case n: Short => row.setShort(i,n) - case other => row.update(i,other) + val n = castedValues(i) + if (n.isInstanceOf[String]) { + if (n.asInstanceOf[String].toLowerCase == "null") { + row.setNullAt(i) + } else { + row.setString(i,n.asInstanceOf[String]) + } + } + else { + row.update(i,n) } i += 1 } - shouldKeep.eval(row).asInstanceOf[Boolean] + if (shouldKeep.eval(row).asInstanceOf[Boolean]) + filterPartition += part + index += 1 } + filterPartition } } - - def execute() = { - /** - * mutableRow is GenericMutableRow type and only created once. - * mutableRow is upadted at each iteration inside the while loop. - */ - val mutableRow = new GenericMutableRow(attributes.length) - var i = 0 - - var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => { - /** rddBuffer keeps track of all the transformed rows. - * needed later to create finalRdd - */ - val rddBuffer = mutable.ArrayBuffer[Row]() - while (iter.hasNext) { - val row = iter.next() - val values = row match { - case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) => - attributeFunctions.map(_(deserializedRow, partitionKeys)) - case deserializedRow: AnyRef => - attributeFunctions.map(_(deserializedRow, Array.empty)) - } - i = 0 - val len = values.length - while ( i < len ) { - values(i) match { - case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i) - case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => - mutableRow.update(i,varchar.getValue) - case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => - mutableRow.update(i,BigDecimal(decimal.bigDecimalValue)) - case other => mutableRow.update(i,other) + /** + * A custom Iterator class passed to mapPartitions() at execute() method. + */ + class MyIterator(iter: Iterator[_],mutableRow: GenericMutableRow) extends Iterator[Row] { + + def hasNext = iter.hasNext + def next = { + val row = iter.next() + val values = row match { + case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) => + attributeFunctions.map(_(deserializedRow, partitionKeys)) + case deserializedRow: AnyRef => + attributeFunctions.map(_(deserializedRow, Array.empty)) + } + var i = 0 + val len = values.length + while ( i < len ) { + val n = values(i) + if(n.isInstanceOf[String]){ + if(n.asInstanceOf[String].toLowerCase == "null"){ + mutableRow.setNullAt(i) + }else{ + mutableRow.setString(i,n.asInstanceOf[String]) } - i += 1 } - rddBuffer += mutableRow + else if(n.isInstanceOf[HiveVarchar]){ + mutableRow.update(i,n.asInstanceOf[HiveVarchar].getValue) + } + else if(n.isInstanceOf[HiveDecimal]){ + mutableRow.update(i,BigDecimal(n.asInstanceOf[HiveDecimal].bigDecimalValue)) + } + else{ + mutableRow.update(i,n) + } + i += 1 } - rddBuffer + mutableRow + } + } + + def execute() = { + /** + * mutableRow is GenericMutableRow type and only created once per partition. + */ + inputRdd.mapPartitions((iter: Iterator[_]) => { + var mutableRow = new GenericMutableRow(attributes.length) + new MyIterator(iter,mutableRow) }) - /** finalRdd ... equivelant to Rdd generated from inputRdd.map(...) */ - val finalRdd = inputRdd.context.makeRDD(res(0)) - finalRdd + } def output = attributes } From 9d2c6ca513580749ca2fd4cd60076fe83bc166aa Mon Sep 17 00:00:00 2001 From: esoroush Date: Fri, 25 Apr 2014 14:13:01 -0700 Subject: [PATCH 8/8] minor refinements ... tab deletion ... --- .../apache/spark/sql/hive/hiveOperators.scala | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 613ee24eb4ef5..ca5bb1a1c6971 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -76,17 +76,19 @@ case class HiveTableScan( @transient val hadoopReader = new HadoopTableReader(relation.tableDesc, sc) - /* TODO:attempt to retrieve a list of column ids that is required - // require more work ... - val hadoopReader = { - val cNames: ArrayList[Integer] = getNeededColumnIDs() - if (cNames.size() != 0) { - ColumnProjectionUtils.appendReadColumnIDs(sc.hiveconf,cNames) - } else { - ColumnProjectionUtils.setFullyReadColumns(sc.hiveconf) - } - new HadoopTableReader(relation.tableDesc, sc) - }*/ + /** + * attempt to retrieve a list of column ids that is required + * TODO: require more work ... + * val hadoopReader = { + * val cNames: ArrayList[Integer] = getNeededColumnIDs() + * if (cNames.size() != 0) { + * ColumnProjectionUtils.appendReadColumnIDs(sc.hiveconf,cNames) + * } else { + * ColumnProjectionUtils.setFullyReadColumns(sc.hiveconf) + * } + * new HadoopTableReader(relation.tableDesc, sc) + * } + */ /** * The hive object inspector for this table, which can be used to extract values from the @@ -183,7 +185,7 @@ case class HiveTableScan( len = castedValues.length // castedValues represents columns in the row. while (i < len) { - val n = castedValues(i) + val n = castedValues(i) if (n.isInstanceOf[String]) { if (n.asInstanceOf[String].toLowerCase == "null") { row.setNullAt(i) @@ -196,9 +198,10 @@ case class HiveTableScan( } i += 1 } - if (shouldKeep.eval(row).asInstanceOf[Boolean]) + if (shouldKeep.eval(row).asInstanceOf[Boolean]) { filterPartition += part - index += 1 + } + index += 1 } filterPartition }