Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {

// Sort with Limit clause causes failure.
"ctas",
"ctas_hadoop20"
"ctas_hadoop20",

// timestamp in array, the output format of Hive contains double quotes, while
// Spark SQL doesn't
"udf_sort_array"
) ++ HiveShim.compatibilityBlackList

/**
Expand Down Expand Up @@ -861,6 +865,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"udf_minute",
"udf_modulo",
"udf_month",
"udf_named_struct",
"udf_negative",
"udf_not",
"udf_notequal",
Expand Down Expand Up @@ -894,6 +899,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"udf_stddev_pop",
"udf_stddev_samp",
"udf_string",
"udf_struct",
"udf_substring",
"udf_subtract",
"udf_sum",
Expand Down
185 changes: 122 additions & 63 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.hive

import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.{io => hadoopIo}
Expand Down Expand Up @@ -78,88 +80,103 @@ private[hive] trait HiveInspectors {
case c: Class[_] if c == classOf[java.lang.Object] => NullType
}

/** Converts hive types to native catalyst types. */
def unwrap(a: Any): Any = a match {
case null => null
case i: hadoopIo.IntWritable => i.get
case t: hadoopIo.Text => t.toString
case l: hadoopIo.LongWritable => l.get
case d: hadoopIo.DoubleWritable => d.get
case d: hiveIo.DoubleWritable => d.get
case s: hiveIo.ShortWritable => s.get
case b: hadoopIo.BooleanWritable => b.get
case b: hiveIo.ByteWritable => b.get
case b: hadoopIo.FloatWritable => b.get
case b: hadoopIo.BytesWritable => {
val bytes = new Array[Byte](b.getLength)
System.arraycopy(b.getBytes(), 0, bytes, 0, b.getLength)
bytes
}
case d: hiveIo.DateWritable => d.get
case t: hiveIo.TimestampWritable => t.getTimestamp
case b: hiveIo.HiveDecimalWritable => BigDecimal(b.getHiveDecimal().bigDecimalValue())
case list: java.util.List[_] => list.map(unwrap)
case map: java.util.Map[_,_] => map.map { case (k, v) => (unwrap(k), unwrap(v)) }.toMap
case array: Array[_] => array.map(unwrap).toSeq
case p: java.lang.Short => p
case p: java.lang.Long => p
case p: java.lang.Float => p
case p: java.lang.Integer => p
case p: java.lang.Double => p
case p: java.lang.Byte => p
case p: java.lang.Boolean => p
case str: String => str
case p: java.math.BigDecimal => p
case p: Array[Byte] => p
case p: java.sql.Date => p
case p: java.sql.Timestamp => p
}

def unwrapData(data: Any, oi: ObjectInspector): Any = oi match {
/**
* Converts hive types to native catalyst types.
* @param data the data in Hive type
* @param oi the ObjectInspector associated with the Hive Type
* @return convert the data into catalyst type
*/
def unwrap(data: Any, oi: ObjectInspector): Any = oi match {
case hvoi: HiveVarcharObjectInspector =>
if (data == null) null else hvoi.getPrimitiveJavaObject(data).getValue
case hdoi: HiveDecimalObjectInspector =>
if (data == null) null else BigDecimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue())
case pi: PrimitiveObjectInspector => pi.getPrimitiveJavaObject(data)
case li: ListObjectInspector =>
Option(li.getList(data))
.map(_.map(unwrapData(_, li.getListElementObjectInspector)).toSeq)
.map(_.map(unwrap(_, li.getListElementObjectInspector)).toSeq)
.orNull
case mi: MapObjectInspector =>
Option(mi.getMap(data)).map(
_.map {
case (k,v) =>
(unwrapData(k, mi.getMapKeyObjectInspector),
unwrapData(v, mi.getMapValueObjectInspector))
(unwrap(k, mi.getMapKeyObjectInspector),
unwrap(v, mi.getMapValueObjectInspector))
}.toMap).orNull
case si: StructObjectInspector =>
val allRefs = si.getAllStructFieldRefs
new GenericRow(
allRefs.map(r =>
unwrapData(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray)
unwrap(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray)
}

/** Converts native catalyst types to the types expected by Hive */
def wrap(a: Any): AnyRef = a match {
case s: String => s: java.lang.String
case i: Int => i: java.lang.Integer
case b: Boolean => b: java.lang.Boolean
case f: Float => f: java.lang.Float
case d: Double => d: java.lang.Double
case l: Long => l: java.lang.Long
case l: Short => l: java.lang.Short
case l: Byte => l: java.lang.Byte
case b: BigDecimal => HiveShim.createDecimal(b.underlying())
case b: Array[Byte] => b
case d: java.sql.Date => d
case t: java.sql.Timestamp => t
case s: Seq[_] => seqAsJavaList(s.map(wrap))
case m: Map[_,_] =>
// Some UDFs seem to assume we pass in a HashMap.
val hashMap = new java.util.HashMap[AnyRef, AnyRef]()
hashMap.putAll(m.map { case (k, v) => wrap(k) -> wrap(v) })
hashMap
case null => null
/**
* Converts native catalyst types to the types expected by Hive
* @param a the value to be wrapped
* @param oi This ObjectInspector associated with the value returned by this function, and
* the ObjectInspector should also be consistent with those returned from
* toInspector: DataType => ObjectInspector and
* toInspector: Expression => ObjectInspector
*/
def wrap(a: Any, oi: ObjectInspector): AnyRef = if (a == null) {
null
} else {
oi match {
case x: ConstantObjectInspector => x.getWritableConstantValue
case x: PrimitiveObjectInspector => a match {
// TODO what if x.preferWritable() == true? reuse the writable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that the Writable related code was reverted after rebase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently the oi is should not be "preferWritable" as toInspector doesn't return that. Even if we return an new instance of Writable here, it's the same as the preferWritable ObjectInspector does internally.
As you suggested we don't want to dynamically check the oi type, I will keep that for future improvement, and to reuse the writable object.

case s: String => s: java.lang.String
case i: Int => i: java.lang.Integer
case b: Boolean => b: java.lang.Boolean
case f: Float => f: java.lang.Float
case d: Double => d: java.lang.Double
case l: Long => l: java.lang.Long
case l: Short => l: java.lang.Short
case l: Byte => l: java.lang.Byte
case b: BigDecimal => HiveShim.createDecimal(b.underlying())
case b: Array[Byte] => b
case d: java.sql.Date => d
case t: java.sql.Timestamp => t
}
case x: StructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
val row = a.asInstanceOf[Seq[_]]
val result = new java.util.ArrayList[AnyRef](fieldRefs.length)
var i = 0
while (i < fieldRefs.length) {
result.add(wrap(row(i), fieldRefs.get(i).getFieldObjectInspector))
i += 1
}

result
case x: ListObjectInspector =>
val list = new java.util.ArrayList[Object]
a.asInstanceOf[Seq[_]].foreach {
v => list.add(wrap(v, x.getListElementObjectInspector))
}
list
case x: MapObjectInspector =>
// Some UDFs seem to assume we pass in a HashMap.
val hashMap = new java.util.HashMap[AnyRef, AnyRef]()
hashMap.putAll(a.asInstanceOf[Map[_, _]].map {
case (k, v) =>
wrap(k, x.getMapKeyObjectInspector) -> wrap(v, x.getMapValueObjectInspector)
})

hashMap
}
}

def wrap(
row: Seq[Any],
inspectors: Seq[ObjectInspector],
cache: Array[AnyRef]): Array[AnyRef] = {
var i = 0
while (i < inspectors.length) {
cache(i) = wrap(row(i), inspectors(i))
i += 1
}
cache
}

def toInspector(dataType: DataType): ObjectInspector = dataType match {
Expand All @@ -186,6 +203,48 @@ private[hive] trait HiveInspectors {
fields.map(f => f.name), fields.map(f => toInspector(f.dataType)))
}

def toInspector(expr: Expression): ObjectInspector = expr match {
case Literal(value: String, StringType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Int, IntegerType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Double, DoubleType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Boolean, BooleanType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Long, LongType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Float, FloatType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Short, ShortType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Byte, ByteType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: Array[Byte], BinaryType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: java.sql.Date, DateType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: java.sql.Timestamp, TimestampType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(value: BigDecimal, DecimalType) =>
HiveShim.getPrimitiveWritableConstantObjectInspector(value)
case Literal(_, NullType) =>
HiveShim.getPrimitiveNullWritableConstantObjectInspector
case Literal(value: Seq[_], ArrayType(dt, _)) =>
val listObjectInspector = toInspector(dt)
val list = new java.util.ArrayList[Object]()
value.foreach(v => list.add(wrap(v, listObjectInspector)))
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, list)
case Literal(map: Map[_, _], MapType(keyType, valueType, _)) =>
val value = new java.util.HashMap[Object, Object]()
val keyOI = toInspector(keyType)
val valueOI = toInspector(valueType)
map.foreach (entry => value.put(wrap(entry._1, keyOI), wrap(entry._2, valueOI)))
ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, value)
case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type [$dt].")
case _ => toInspector(expr.dataType)
}

def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match {
case s: StructObjectInspector =>
StructType(s.getAllStructFieldRefs.map(f => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private[hive] object HadoopTableReader extends HiveInspectors {
case oi: DoubleObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi =>
(value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapData(value, oi)
(value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi)
}
}

Expand Down
Loading