From 8814c3a9a059bd9f222879c93b39cb0e89fc6ed3 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Sun, 12 Oct 2014 14:46:32 +0800 Subject: [PATCH 1/8] Passing ContantObjectInspector(when necessary) for UDF initializing --- .../execution/HiveCompatibilitySuite.scala | 2 + .../spark/sql/hive/HiveInspectors.scala | 136 +++++++++++++++--- .../org/apache/spark/sql/hive/hiveUdfs.scala | 31 ++-- ...d_struct-0-8f0ea83364b78634fbb3752c5a5c725 | 1 + ..._struct-1-380c9638cc6ea8ea42f187bf0cedf350 | 1 + ..._struct-2-22a79ac608b1249306f82f4bdc669b17 | 0 ..._struct-3-d7e4a555934307155784904ff9df188b | 1 + ..._struct-0-f41043b7d9f14fa5e998c90454c7bdb1 | 1 + ..._struct-1-8ccdb20153debdab789ea8ad0228e2eb | 1 + ..._struct-2-4a62774a6de7571c8d2bcb77da63f8f3 | 0 ..._struct-3-abffdaacb0c7076ab538fbeec072daa2 | 1 + 11 files changed, 140 insertions(+), 35 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/udf_named_struct-0-8f0ea83364b78634fbb3752c5a5c725 create mode 100644 sql/hive/src/test/resources/golden/udf_named_struct-1-380c9638cc6ea8ea42f187bf0cedf350 create mode 100644 sql/hive/src/test/resources/golden/udf_named_struct-2-22a79ac608b1249306f82f4bdc669b17 create mode 100644 sql/hive/src/test/resources/golden/udf_named_struct-3-d7e4a555934307155784904ff9df188b create mode 100644 sql/hive/src/test/resources/golden/udf_struct-0-f41043b7d9f14fa5e998c90454c7bdb1 create mode 100644 sql/hive/src/test/resources/golden/udf_struct-1-8ccdb20153debdab789ea8ad0228e2eb create mode 100644 sql/hive/src/test/resources/golden/udf_struct-2-4a62774a6de7571c8d2bcb77da63f8f3 create mode 100644 sql/hive/src/test/resources/golden/udf_struct-3-abffdaacb0c7076ab538fbeec072daa2 diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 26d9ca05c851b..3fa34d011a482 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -861,6 +861,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_minute", "udf_modulo", "udf_month", + "udf_named_struct", "udf_negative", "udf_not", "udf_notequal", @@ -894,6 +895,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_stddev_pop", "udf_stddev_samp", "udf_string", + "udf_struct", "udf_substring", "udf_subtract", "udf_sum", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index fad7373a2fa39..87fdd08a350f0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} @@ -98,9 +100,12 @@ private[hive] trait HiveInspectors { case d: hiveIo.DateWritable => d.get case t: hiveIo.TimestampWritable => t.getTimestamp case b: hiveIo.HiveDecimalWritable => BigDecimal(b.getHiveDecimal().bigDecimalValue()) - case list: java.util.List[_] => list.map(unwrap) case map: java.util.Map[_,_] => map.map { case (k, v) => (unwrap(k), unwrap(v)) }.toMap - case array: Array[_] => array.map(unwrap).toSeq + // StandardStructObjectInspector expects the data as either Object Array or java.util.List + case array: Array[_] => Row(array.map(unwrap): _*) + case array: java.util.List[_] => Row(array.toArray.map(unwrap): _*) + // TODO how about the ListObjectInspector + // case list: java.util.List[_] => list.map(unwrap) case p: java.lang.Short => p case p: java.lang.Long => p case p: java.lang.Float => p @@ -139,27 +144,67 @@ private[hive] trait HiveInspectors { unwrapData(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray) } - /** Converts native catalyst types to the types expected by Hive */ - def wrap(a: Any): AnyRef = a match { - case s: String => s: java.lang.String - case i: Int => i: java.lang.Integer - case b: Boolean => b: java.lang.Boolean - case f: Float => f: java.lang.Float - case d: Double => d: java.lang.Double - case l: Long => l: java.lang.Long - case l: Short => l: java.lang.Short - case l: Byte => l: java.lang.Byte - case b: BigDecimal => HiveShim.createDecimal(b.underlying()) - case b: Array[Byte] => b - case d: java.sql.Date => d - case t: java.sql.Timestamp => t - case s: Seq[_] => seqAsJavaList(s.map(wrap)) - case m: Map[_,_] => - // Some UDFs seem to assume we pass in a HashMap. - val hashMap = new java.util.HashMap[AnyRef, AnyRef]() - hashMap.putAll(m.map { case (k, v) => wrap(k) -> wrap(v) }) - hashMap - case null => null + /** + * Converts native catalyst types to the types expected by Hive + * @param a the value to be wrapped + * @param oi the destination ObjectInspector, which supposed to be the ObjectInspector enumerated + * in functions: + * def toInspector(dataType: DataType) + * def toInspector(expr: Expression) + */ + def wrap(a: Any, oi: ObjectInspector): AnyRef = if (a == null) { + null + } else { + oi match { + case x: ConstantObjectInspector => x.getWritableConstantValue + case x: PrimitiveObjectInspector => a match { + case s: String => s: java.lang.String + case i: Int => i: java.lang.Integer + case b: Boolean => b: java.lang.Boolean + case f: Float => f: java.lang.Float + case d: Double => d: java.lang.Double + case l: Long => l: java.lang.Long + case l: Short => l: java.lang.Short + case l: Byte => l: java.lang.Byte + case b: BigDecimal => HiveShim.createDecimal(b.underlying()) + case b: Array[Byte] => b + case d: java.sql.Date => d + case t: java.sql.Timestamp => t + } + case x: StructObjectInspector => { + val fieldRefs = x.getAllStructFieldRefs + val row = a.asInstanceOf[Seq[_]] + val result = new java.util.ArrayList[AnyRef](fieldRefs.length) + var i = 0 + while(i < fieldRefs.length) { + result.add(wrap(row(i), fieldRefs.get(i).getFieldObjectInspector)) + i += 1 + } + + result + } + case x: ListObjectInspector => + seqAsJavaList(a.asInstanceOf[Seq[_]].map(v => wrap(v, x.getListElementObjectInspector))) + case x: MapObjectInspector => + // Some UDFs seem to assume we pass in a HashMap. + val hashMap = new java.util.HashMap[AnyRef, AnyRef]() + hashMap.putAll(a.asInstanceOf[Map[_, _]].map { + case (k, v) => + wrap(k, x.getMapKeyObjectInspector) -> wrap(v, x.getMapValueObjectInspector) + }) + + hashMap + } + } + + def wrap(row: Seq[Any], inspectors: Seq[ObjectInspector], cache: Array[AnyRef]) + : Array[AnyRef] = { + var i = 0 + while(i < inspectors.length) { + cache(i) = wrap(row(i), inspectors(i)) + i += 1 + } + cache } def toInspector(dataType: DataType): ObjectInspector = dataType match { @@ -186,6 +231,51 @@ private[hive] trait HiveInspectors { fields.map(f => f.name), fields.map(f => toInspector(f.dataType))) } + def toInspector(expr: Expression): ObjectInspector = expr match { + case Literal(value: String, StringType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.stringTypeInfo, new hadoopIo.Text(value)) + case Literal(value: Int, IntegerType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.intTypeInfo, new hadoopIo.IntWritable(value)) + case Literal(value: Double, DoubleType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.doubleTypeInfo, new hiveIo.DoubleWritable(value)) + case Literal(value: Boolean, BooleanType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.booleanTypeInfo, new hadoopIo.BooleanWritable(value)) + case Literal(value: Long, LongType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.longTypeInfo, new hadoopIo.LongWritable(value)) + case Literal(value: Float, FloatType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.floatTypeInfo, new hadoopIo.FloatWritable(value)) + case Literal(value: Short, ShortType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.shortTypeInfo, new hiveIo.ShortWritable(value)) + case Literal(value: Byte, ByteType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.byteTypeInfo, new hiveIo.ByteWritable(value)) + case Literal(value: Array[Byte], BinaryType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.binaryTypeInfo, new hadoopIo.BytesWritable(value)) + case Literal(value: java.sql.Date, DateType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.dateTypeInfo, new hiveIo.DateWritable(value)) + case Literal(value: java.sql.Timestamp, TimestampType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.timestampTypeInfo, new hiveIo.TimestampWritable(value)) + case Literal(value: BigDecimal, DecimalType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.decimalTypeInfo, + new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) + case Literal(_, NullType) => + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.voidTypeInfo, null) + case Literal(_, _) => sys.error("Hive doesn't support the constant complicated type.") + case _ => toInspector(expr.dataType) + } + def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match { case s: StructObjectInspector => StructType(s.getAllStructFieldRefs.map(f => { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 683c820dec305..d9f95da2265c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector import org.apache.hadoop.hive.ql.exec.{UDF, UDAF} import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} @@ -97,7 +97,7 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo)) @transient - protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray + protected lazy val arguments = children.map(toInspector).toArray @transient protected lazy val isUDFDeterministic = { @@ -116,12 +116,13 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ @transient lazy val dataType = javaClassToDataType(method.getReturnType) + @transient + protected lazy val cached = new Array[AnyRef](children.length) + // TODO: Finish input output types. override def eval(input: Row): Any = { - val evaluatedChildren = children.map(c => wrap(c.eval(input))) - unwrap(FunctionRegistry.invoke(method, function, conversionHelper - .convertIfNecessary(evaluatedChildren: _*): _*)) + .convertIfNecessary(wrap(children.map(c => c.eval(input)), arguments, cached): _*): _*)) } } @@ -133,7 +134,7 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq type UDFType = GenericUDF @transient - protected lazy val argumentInspectors = children.map(_.dataType).map(toInspector) + protected lazy val argumentInspectors = children.map(toInspector) @transient protected lazy val returnInspector = function.initialize(argumentInspectors.toArray) @@ -155,11 +156,13 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq // Adapter from Catalyst ExpressionResult to Hive DeferredObject class DeferredObjectAdapter extends DeferredObject { private var func: () => Any = _ - def set(func: () => Any) { + private var oi: ObjectInspector = _ + def set(func: () => Any, oi: ObjectInspector) { this.func = func + this.oi = oi } override def prepare(i: Int) = {} - override def get(): AnyRef = wrap(func()) + override def get(): AnyRef = wrap(func(), oi) } lazy val dataType: DataType = inspectorToDataType(returnInspector) @@ -169,7 +172,10 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq var i = 0 while (i < children.length) { val idx = i - deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)}) + deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set( + () => { + children(idx).eval(input) + }, argumentInspectors(i)) i += 1 } unwrap(function.evaluate(deferedObjects)) @@ -265,6 +271,9 @@ private[hive] case class HiveGenericUdtf( structInspector.getAllStructFieldRefs.map(_.getFieldObjectInspector) } + @transient + protected lazy val udtInput = new Array[AnyRef](children.length) + protected lazy val outputDataTypes = outputInspectors.map(inspectorToDataType) override protected def makeOutput() = { @@ -288,9 +297,7 @@ private[hive] case class HiveGenericUdtf( val inputProjection = new InterpretedProjection(children) val collector = new UDTFCollector function.setCollector(collector) - - val udtInput = inputProjection(input).map(wrap).toArray - function.process(udtInput) + function.process(wrap(inputProjection(input), inputInspectors, udtInput)) collector.collectRows() } diff --git a/sql/hive/src/test/resources/golden/udf_named_struct-0-8f0ea83364b78634fbb3752c5a5c725 b/sql/hive/src/test/resources/golden/udf_named_struct-0-8f0ea83364b78634fbb3752c5a5c725 new file mode 100644 index 0000000000000..9bff96e7fa20e --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_named_struct-0-8f0ea83364b78634fbb3752c5a5c725 @@ -0,0 +1 @@ +named_struct(name1, val1, name2, val2, ...) - Creates a struct with the given field names and values diff --git a/sql/hive/src/test/resources/golden/udf_named_struct-1-380c9638cc6ea8ea42f187bf0cedf350 b/sql/hive/src/test/resources/golden/udf_named_struct-1-380c9638cc6ea8ea42f187bf0cedf350 new file mode 100644 index 0000000000000..9bff96e7fa20e --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_named_struct-1-380c9638cc6ea8ea42f187bf0cedf350 @@ -0,0 +1 @@ +named_struct(name1, val1, name2, val2, ...) - Creates a struct with the given field names and values diff --git a/sql/hive/src/test/resources/golden/udf_named_struct-2-22a79ac608b1249306f82f4bdc669b17 b/sql/hive/src/test/resources/golden/udf_named_struct-2-22a79ac608b1249306f82f4bdc669b17 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/udf_named_struct-3-d7e4a555934307155784904ff9df188b b/sql/hive/src/test/resources/golden/udf_named_struct-3-d7e4a555934307155784904ff9df188b new file mode 100644 index 0000000000000..de25f51b5b56d --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_named_struct-3-d7e4a555934307155784904ff9df188b @@ -0,0 +1 @@ +{"foo":1,"bar":2} 1 diff --git a/sql/hive/src/test/resources/golden/udf_struct-0-f41043b7d9f14fa5e998c90454c7bdb1 b/sql/hive/src/test/resources/golden/udf_struct-0-f41043b7d9f14fa5e998c90454c7bdb1 new file mode 100644 index 0000000000000..062cb1bc683b1 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_struct-0-f41043b7d9f14fa5e998c90454c7bdb1 @@ -0,0 +1 @@ +struct(col1, col2, col3, ...) - Creates a struct with the given field values diff --git a/sql/hive/src/test/resources/golden/udf_struct-1-8ccdb20153debdab789ea8ad0228e2eb b/sql/hive/src/test/resources/golden/udf_struct-1-8ccdb20153debdab789ea8ad0228e2eb new file mode 100644 index 0000000000000..062cb1bc683b1 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_struct-1-8ccdb20153debdab789ea8ad0228e2eb @@ -0,0 +1 @@ +struct(col1, col2, col3, ...) - Creates a struct with the given field values diff --git a/sql/hive/src/test/resources/golden/udf_struct-2-4a62774a6de7571c8d2bcb77da63f8f3 b/sql/hive/src/test/resources/golden/udf_struct-2-4a62774a6de7571c8d2bcb77da63f8f3 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/udf_struct-3-abffdaacb0c7076ab538fbeec072daa2 b/sql/hive/src/test/resources/golden/udf_struct-3-abffdaacb0c7076ab538fbeec072daa2 new file mode 100644 index 0000000000000..ff1a28fa47f18 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_struct-3-abffdaacb0c7076ab538fbeec072daa2 @@ -0,0 +1 @@ +{"col1":1} {"col1":1,"col2":"a"} 1 a From f6740fe51b6875aad007383f0199cd537ba6ad10 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 17 Oct 2014 14:45:29 +0800 Subject: [PATCH 2/8] Support Constant ObjectInspector for Map & List --- .../execution/HiveCompatibilitySuite.scala | 6 +- .../spark/sql/hive/HiveInspectors.scala | 78 +++++++------------ .../apache/spark/sql/hive/TableReader.scala | 2 +- .../org/apache/spark/sql/hive/hiveUdfs.scala | 29 ++++--- ...nt array-0-761ef205b10ac4a10122c8b4ce10ada | 1 + ..._array-10-9e047718e5fea6ea79124f1e899f1c13 | 1 + .../sql/hive/execution/HiveQuerySuite.scala | 8 ++ 7 files changed, 64 insertions(+), 61 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/constant array-0-761ef205b10ac4a10122c8b4ce10ada create mode 100644 sql/hive/src/test/resources/golden/udf_sort_array-10-9e047718e5fea6ea79124f1e899f1c13 diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 3fa34d011a482..1a3c24be420e6 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -233,7 +233,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Sort with Limit clause causes failure. "ctas", - "ctas_hadoop20" + "ctas_hadoop20", + + // timestamp in array, the output format of Hive contains double quotes, while + // Spark SQL doesn't + "udf_sort_array" ) ++ HiveShim.compatibilityBlackList /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 87fdd08a350f0..bbeb71fd1a04b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -80,47 +80,13 @@ private[hive] trait HiveInspectors { case c: Class[_] if c == classOf[java.lang.Object] => NullType } - /** Converts hive types to native catalyst types. */ - def unwrap(a: Any): Any = a match { - case null => null - case i: hadoopIo.IntWritable => i.get - case t: hadoopIo.Text => t.toString - case l: hadoopIo.LongWritable => l.get - case d: hadoopIo.DoubleWritable => d.get - case d: hiveIo.DoubleWritable => d.get - case s: hiveIo.ShortWritable => s.get - case b: hadoopIo.BooleanWritable => b.get - case b: hiveIo.ByteWritable => b.get - case b: hadoopIo.FloatWritable => b.get - case b: hadoopIo.BytesWritable => { - val bytes = new Array[Byte](b.getLength) - System.arraycopy(b.getBytes(), 0, bytes, 0, b.getLength) - bytes - } - case d: hiveIo.DateWritable => d.get - case t: hiveIo.TimestampWritable => t.getTimestamp - case b: hiveIo.HiveDecimalWritable => BigDecimal(b.getHiveDecimal().bigDecimalValue()) - case map: java.util.Map[_,_] => map.map { case (k, v) => (unwrap(k), unwrap(v)) }.toMap - // StandardStructObjectInspector expects the data as either Object Array or java.util.List - case array: Array[_] => Row(array.map(unwrap): _*) - case array: java.util.List[_] => Row(array.toArray.map(unwrap): _*) - // TODO how about the ListObjectInspector - // case list: java.util.List[_] => list.map(unwrap) - case p: java.lang.Short => p - case p: java.lang.Long => p - case p: java.lang.Float => p - case p: java.lang.Integer => p - case p: java.lang.Double => p - case p: java.lang.Byte => p - case p: java.lang.Boolean => p - case str: String => str - case p: java.math.BigDecimal => p - case p: Array[Byte] => p - case p: java.sql.Date => p - case p: java.sql.Timestamp => p - } - - def unwrapData(data: Any, oi: ObjectInspector): Any = oi match { + /** + * Converts hive types to native catalyst types. + * @param data the data in Hive type + * @param oi the ObjectInspector associated with the Hive Type + * @return convert the data into catalyst type + */ + def unwrap(data: Any, oi: ObjectInspector): Any = oi match { case hvoi: HiveVarcharObjectInspector => if (data == null) null else hvoi.getPrimitiveJavaObject(data).getValue case hdoi: HiveDecimalObjectInspector => @@ -128,27 +94,27 @@ private[hive] trait HiveInspectors { case pi: PrimitiveObjectInspector => pi.getPrimitiveJavaObject(data) case li: ListObjectInspector => Option(li.getList(data)) - .map(_.map(unwrapData(_, li.getListElementObjectInspector)).toSeq) + .map(_.map(unwrap(_, li.getListElementObjectInspector)).toSeq) .orNull case mi: MapObjectInspector => Option(mi.getMap(data)).map( _.map { case (k,v) => - (unwrapData(k, mi.getMapKeyObjectInspector), - unwrapData(v, mi.getMapValueObjectInspector)) + (unwrap(k, mi.getMapKeyObjectInspector), + unwrap(v, mi.getMapValueObjectInspector)) }.toMap).orNull case si: StructObjectInspector => val allRefs = si.getAllStructFieldRefs new GenericRow( allRefs.map(r => - unwrapData(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray) + unwrap(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray) } /** * Converts native catalyst types to the types expected by Hive * @param a the value to be wrapped - * @param oi the destination ObjectInspector, which supposed to be the ObjectInspector enumerated - * in functions: + * @param oi wrapped object used by this ObjectInspector, which supposed to be + * the ObjectInspector enumerated in functions: * def toInspector(dataType: DataType) * def toInspector(expr: Expression) */ @@ -158,6 +124,7 @@ private[hive] trait HiveInspectors { oi match { case x: ConstantObjectInspector => x.getWritableConstantValue case x: PrimitiveObjectInspector => a match { + // TODO what if x.preferWritable() == true? case s: String => s: java.lang.String case i: Int => i: java.lang.Integer case b: Boolean => b: java.lang.Boolean @@ -184,7 +151,11 @@ private[hive] trait HiveInspectors { result } case x: ListObjectInspector => - seqAsJavaList(a.asInstanceOf[Seq[_]].map(v => wrap(v, x.getListElementObjectInspector))) + val list = new java.util.ArrayList[Object] + a.asInstanceOf[Seq[_]].foreach { + v => list.add(wrap(v, x.getListElementObjectInspector)) + } + list case x: MapObjectInspector => // Some UDFs seem to assume we pass in a HashMap. val hashMap = new java.util.HashMap[AnyRef, AnyRef]() @@ -272,6 +243,17 @@ private[hive] trait HiveInspectors { case Literal(_, NullType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( TypeInfoFactory.voidTypeInfo, null) + case Literal(value: Seq[_], ArrayType(dt, _)) => + val listObjectInspector = toInspector(dt) + val list = new java.util.ArrayList[Object]() + value.foreach(v => list.add(wrap(v, listObjectInspector))) + ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, list) + case Literal(map: Map[_, _], MapType(keyType, valueType, _)) => + val value = new java.util.HashMap[Object, Object]() + val keyOI = toInspector(keyType) + val valueOI = toInspector(valueType) + map.foreach (entry => value.put(wrap(entry._1, keyOI), wrap(entry._2, valueOI))) + ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, value) case Literal(_, _) => sys.error("Hive doesn't support the constant complicated type.") case _ => toInspector(expr.dataType) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 9ff7ab5a124c1..e49f0957d188a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -291,7 +291,7 @@ private[hive] object HadoopTableReader extends HiveInspectors { case oi: DoubleObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) case oi => - (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapData(value, oi) + (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index d9f95da2265c7..747bc822a0f8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -22,6 +22,8 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory import org.apache.hadoop.hive.ql.exec.{UDF, UDAF} import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} @@ -116,13 +118,19 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ @transient lazy val dataType = javaClassToDataType(method.getReturnType) + @transient + lazy val returnInspector = ObjectInspectorFactory.getReflectionObjectInspector( + method.getGenericReturnType(), ObjectInspectorOptions.JAVA) + @transient protected lazy val cached = new Array[AnyRef](children.length) // TODO: Finish input output types. override def eval(input: Row): Any = { - unwrap(FunctionRegistry.invoke(method, function, conversionHelper - .convertIfNecessary(wrap(children.map(c => c.eval(input)), arguments, cached): _*): _*)) + unwrap( + FunctionRegistry.invoke(method, function, conversionHelper + .convertIfNecessary(wrap(children.map(c => c.eval(input)), arguments, cached): _*): _*), + returnInspector) } } @@ -178,7 +186,7 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq }, argumentInspectors(i)) i += 1 } - unwrap(function.evaluate(deferedObjects)) + unwrap(function.evaluate(deferedObjects), returnInspector) } } @@ -266,15 +274,14 @@ private[hive] case class HiveGenericUdtf( protected lazy val inputInspectors = children.map(_.dataType).map(toInspector) @transient - protected lazy val outputInspectors = { - val structInspector = function.initialize(inputInspectors.toArray) - structInspector.getAllStructFieldRefs.map(_.getFieldObjectInspector) - } + protected lazy val outputInspector = function.initialize(inputInspectors.toArray) @transient protected lazy val udtInput = new Array[AnyRef](children.length) - protected lazy val outputDataTypes = outputInspectors.map(inspectorToDataType) + protected lazy val outputDataTypes = outputInspector.getAllStructFieldRefs.map { + field => inspectorToDataType(field.getFieldObjectInspector) + } override protected def makeOutput() = { // Use column names when given, otherwise c_1, c_2, ... c_n. @@ -292,7 +299,7 @@ private[hive] case class HiveGenericUdtf( } override def eval(input: Row): TraversableOnce[Row] = { - outputInspectors // Make sure initialized. + outputInspector // Make sure initialized. val inputProjection = new InterpretedProjection(children) val collector = new UDTFCollector @@ -308,7 +315,7 @@ private[hive] case class HiveGenericUdtf( // We need to clone the input here because implementations of // GenericUDTF reuse the same object. Luckily they are always an array, so // it is easy to clone. - collected += new GenericRow(input.asInstanceOf[Array[_]].map(unwrap)) + collected += unwrap(input, outputInspector).asInstanceOf[Row] } def collectRows() = { @@ -349,7 +356,7 @@ private[hive] case class HiveUdafFunction( private val buffer = function.getNewAggregationBuffer.asInstanceOf[GenericUDAFEvaluator.AbstractAggregationBuffer] - override def eval(input: Row): Any = unwrapData(function.evaluate(buffer), returnInspector) + override def eval(input: Row): Any = unwrap(function.evaluate(buffer), returnInspector) @transient val inputProjection = new InterpretedProjection(exprs) diff --git a/sql/hive/src/test/resources/golden/constant array-0-761ef205b10ac4a10122c8b4ce10ada b/sql/hive/src/test/resources/golden/constant array-0-761ef205b10ac4a10122c8b4ce10ada new file mode 100644 index 0000000000000..94f18d09863a7 --- /dev/null +++ b/sql/hive/src/test/resources/golden/constant array-0-761ef205b10ac4a10122c8b4ce10ada @@ -0,0 +1 @@ +["enterprise databases","hadoop distributed file system","hadoop map-reduce"] diff --git a/sql/hive/src/test/resources/golden/udf_sort_array-10-9e047718e5fea6ea79124f1e899f1c13 b/sql/hive/src/test/resources/golden/udf_sort_array-10-9e047718e5fea6ea79124f1e899f1c13 new file mode 100644 index 0000000000000..9d33cd51fef04 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_sort_array-10-9e047718e5fea6ea79124f1e899f1c13 @@ -0,0 +1 @@ +[1,2,3,4,5] [1,2,7,8,9] [4,8,16,32,64] [1,100,246,357,1000] [false,true] [1.414,1.618,2.718,3.141] [1.41421,1.61803,2.71828,3.14159] ["","aramis","athos","portos"] ["1970-01-05 13:51:04.042","1970-01-07 00:54:54.442","1970-01-16 12:50:35.242"] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 322a25bb20837..ffe1f0b90fcd0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -34,6 +34,14 @@ case class TestData(a: Int, b: String) * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ class HiveQuerySuite extends HiveComparisonTest { + createQueryTest("constant array", + """ + |SELECT sort_array( + | sort_array( + | array("hadoop distributed file system", + | "enterprise databases", "hadoop map-reduce"))) + |FROM src LIMIT 1; + """.stripMargin) createQueryTest("count distinct 0 values", """ From fe591e431c32607e220ae17269464fbb83200f53 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 17 Oct 2014 15:02:23 +0800 Subject: [PATCH 3/8] update HiveGenericUdf for set the ObjectInspector while constructing the DeferredObject --- .../org/apache/spark/sql/hive/HiveInspectors.scala | 2 +- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index bbeb71fd1a04b..6bf3f3039a99f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -254,7 +254,7 @@ private[hive] trait HiveInspectors { val valueOI = toInspector(valueType) map.foreach (entry => value.put(wrap(entry._1, keyOI), wrap(entry._2, valueOI))) ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, value) - case Literal(_, _) => sys.error("Hive doesn't support the constant complicated type.") + case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type [$dt].") case _ => toInspector(expr.dataType) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 747bc822a0f8e..aff4ddce92272 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -157,17 +157,15 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable) } - protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({ - new DeferredObjectAdapter - }) + @transient + protected lazy val deferedObjects = + argumentInspectors.map(new DeferredObjectAdapter(_)).toArray[DeferredObject] // Adapter from Catalyst ExpressionResult to Hive DeferredObject - class DeferredObjectAdapter extends DeferredObject { + class DeferredObjectAdapter(oi: ObjectInspector) extends DeferredObject { private var func: () => Any = _ - private var oi: ObjectInspector = _ - def set(func: () => Any, oi: ObjectInspector) { + def set(func: () => Any) { this.func = func - this.oi = oi } override def prepare(i: Int) = {} override def get(): AnyRef = wrap(func(), oi) @@ -183,7 +181,7 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set( () => { children(idx).eval(input) - }, argumentInspectors(i)) + }) i += 1 } unwrap(function.evaluate(deferedObjects), returnInspector) From 683d3fdcbfc06de9c73c3fc8d878899851beb3c0 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 27 Oct 2014 13:02:29 +0800 Subject: [PATCH 4/8] Add golden files --- .../golden/udf_sort_array-0-e86d559aeb84a4cc017a103182c22bfb | 0 .../golden/udf_sort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 | 1 + .../golden/udf_sort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 | 4 ++++ .../golden/udf_sort_array-3-55c4cdaf8438b06675d60848d68f35de | 0 4 files changed, 5 insertions(+) create mode 100644 sql/hive/src/test/resources/golden/udf_sort_array-0-e86d559aeb84a4cc017a103182c22bfb create mode 100644 sql/hive/src/test/resources/golden/udf_sort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 create mode 100644 sql/hive/src/test/resources/golden/udf_sort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 create mode 100644 sql/hive/src/test/resources/golden/udf_sort_array-3-55c4cdaf8438b06675d60848d68f35de diff --git a/sql/hive/src/test/resources/golden/udf_sort_array-0-e86d559aeb84a4cc017a103182c22bfb b/sql/hive/src/test/resources/golden/udf_sort_array-0-e86d559aeb84a4cc017a103182c22bfb new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/udf_sort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 b/sql/hive/src/test/resources/golden/udf_sort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 new file mode 100644 index 0000000000000..d514df4191b89 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_sort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 @@ -0,0 +1 @@ +sort_array(array(obj1, obj2,...)) - Sorts the input array in ascending order according to the natural ordering of the array elements. diff --git a/sql/hive/src/test/resources/golden/udf_sort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 b/sql/hive/src/test/resources/golden/udf_sort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 new file mode 100644 index 0000000000000..43e36513de881 --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_sort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 @@ -0,0 +1,4 @@ +sort_array(array(obj1, obj2,...)) - Sorts the input array in ascending order according to the natural ordering of the array elements. +Example: + > SELECT sort_array(array('b', 'd', 'c', 'a')) FROM src LIMIT 1; + 'a', 'b', 'c', 'd' diff --git a/sql/hive/src/test/resources/golden/udf_sort_array-3-55c4cdaf8438b06675d60848d68f35de b/sql/hive/src/test/resources/golden/udf_sort_array-3-55c4cdaf8438b06675d60848d68f35de new file mode 100644 index 0000000000000..e69de29bb2d1d From 4e56e1b7b17df931f6242c8caa9cfe688536352b Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 27 Oct 2014 13:39:13 +0800 Subject: [PATCH 5/8] style issue --- .../spark/sql/hive/HiveInspectors.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 6bf3f3039a99f..989788814fd37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -113,10 +113,10 @@ private[hive] trait HiveInspectors { /** * Converts native catalyst types to the types expected by Hive * @param a the value to be wrapped - * @param oi wrapped object used by this ObjectInspector, which supposed to be - * the ObjectInspector enumerated in functions: - * def toInspector(dataType: DataType) - * def toInspector(expr: Expression) + * @param oi This ObjectInspector associated with the value returned by this function, and + * the ObjectInspector should also be consistent with those returned from + * toInspector: DataType => ObjectInspector and + * toInspector: Expression => ObjectInspector */ def wrap(a: Any, oi: ObjectInspector): AnyRef = if (a == null) { null @@ -124,7 +124,7 @@ private[hive] trait HiveInspectors { oi match { case x: ConstantObjectInspector => x.getWritableConstantValue case x: PrimitiveObjectInspector => a match { - // TODO what if x.preferWritable() == true? + // TODO what if x.preferWritable() == true? reuse the writable? case s: String => s: java.lang.String case i: Int => i: java.lang.Integer case b: Boolean => b: java.lang.Boolean @@ -138,7 +138,7 @@ private[hive] trait HiveInspectors { case d: java.sql.Date => d case t: java.sql.Timestamp => t } - case x: StructObjectInspector => { + case x: StructObjectInspector => val fieldRefs = x.getAllStructFieldRefs val row = a.asInstanceOf[Seq[_]] val result = new java.util.ArrayList[AnyRef](fieldRefs.length) @@ -149,7 +149,6 @@ private[hive] trait HiveInspectors { } result - } case x: ListObjectInspector => val list = new java.util.ArrayList[Object] a.asInstanceOf[Seq[_]].foreach { @@ -168,8 +167,10 @@ private[hive] trait HiveInspectors { } } - def wrap(row: Seq[Any], inspectors: Seq[ObjectInspector], cache: Array[AnyRef]) - : Array[AnyRef] = { + def wrap( + row: Seq[Any], + inspectors: Seq[ObjectInspector], + cache: Array[AnyRef]): Array[AnyRef] = { var i = 0 while(i < inspectors.length) { cache(i) = wrap(row(i), inspectors(i)) From 5793c0151a25df570988ee5d8d9bca2d37ba5363 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 27 Oct 2014 14:46:59 +0800 Subject: [PATCH 6/8] add space before while --- .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 989788814fd37..8250e21a28510 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -143,7 +143,7 @@ private[hive] trait HiveInspectors { val row = a.asInstanceOf[Seq[_]] val result = new java.util.ArrayList[AnyRef](fieldRefs.length) var i = 0 - while(i < fieldRefs.length) { + while (i < fieldRefs.length) { result.add(wrap(row(i), fieldRefs.get(i).getFieldObjectInspector)) i += 1 } @@ -172,7 +172,7 @@ private[hive] trait HiveInspectors { inspectors: Seq[ObjectInspector], cache: Array[AnyRef]): Array[AnyRef] = { var i = 0 - while(i < inspectors.length) { + while (i < inspectors.length) { cache(i) = wrap(row(i), inspectors(i)) i += 1 } From 2416e5d8b95e1b2d7ccf5cf1ad15748de251124b Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 28 Oct 2014 09:27:20 +0800 Subject: [PATCH 7/8] revert to hive 0.12 --- .../spark/sql/hive/HiveInspectors.scala | 68 +++++++++++++++---- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 8250e21a28510..e789bc2df5de5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -204,46 +204,88 @@ private[hive] trait HiveInspectors { } def toInspector(expr: Expression): ObjectInspector = expr match { + // TODO this is how we used it in Hive 0.13 +// case Literal(value: String, StringType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.stringTypeInfo, new hadoopIo.Text(value)) +// case Literal(value: Int, IntegerType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.intTypeInfo, new hadoopIo.IntWritable(value)) +// case Literal(value: Double, DoubleType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.doubleTypeInfo, new hiveIo.DoubleWritable(value)) +// case Literal(value: Boolean, BooleanType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.booleanTypeInfo, new hadoopIo.BooleanWritable(value)) +// case Literal(value: Long, LongType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.longTypeInfo, new hadoopIo.LongWritable(value)) +// case Literal(value: Float, FloatType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.floatTypeInfo, new hadoopIo.FloatWritable(value)) +// case Literal(value: Short, ShortType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.shortTypeInfo, new hiveIo.ShortWritable(value)) +// case Literal(value: Byte, ByteType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.byteTypeInfo, new hiveIo.ByteWritable(value)) +// case Literal(value: Array[Byte], BinaryType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.binaryTypeInfo, new hadoopIo.BytesWritable(value)) +// case Literal(value: java.sql.Date, DateType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.dateTypeInfo, new hiveIo.DateWritable(value)) +// case Literal(value: java.sql.Timestamp, TimestampType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.timestampTypeInfo, new hiveIo.TimestampWritable(value)) +// case Literal(value: BigDecimal, DecimalType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.decimalTypeInfo, +// new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) +// case Literal(_, NullType) => +// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +// TypeInfoFactory.voidTypeInfo, null) + // the following code is for Hive 0.12 case Literal(value: String, StringType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.stringTypeInfo, new hadoopIo.Text(value)) + PrimitiveCategory.STRING, new hadoopIo.Text(value)) case Literal(value: Int, IntegerType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.intTypeInfo, new hadoopIo.IntWritable(value)) + PrimitiveCategory.INT, new hadoopIo.IntWritable(value)) case Literal(value: Double, DoubleType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.doubleTypeInfo, new hiveIo.DoubleWritable(value)) + PrimitiveCategory.DOUBLE, new hiveIo.DoubleWritable(value)) case Literal(value: Boolean, BooleanType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.booleanTypeInfo, new hadoopIo.BooleanWritable(value)) + PrimitiveCategory.BOOLEAN, new hadoopIo.BooleanWritable(value)) case Literal(value: Long, LongType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.longTypeInfo, new hadoopIo.LongWritable(value)) + PrimitiveCategory.LONG, new hadoopIo.LongWritable(value)) case Literal(value: Float, FloatType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.floatTypeInfo, new hadoopIo.FloatWritable(value)) + PrimitiveCategory.FLOAT, new hadoopIo.FloatWritable(value)) case Literal(value: Short, ShortType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.shortTypeInfo, new hiveIo.ShortWritable(value)) + PrimitiveCategory.SHORT, new hiveIo.ShortWritable(value)) case Literal(value: Byte, ByteType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.byteTypeInfo, new hiveIo.ByteWritable(value)) + PrimitiveCategory.BYTE, new hiveIo.ByteWritable(value)) case Literal(value: Array[Byte], BinaryType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.binaryTypeInfo, new hadoopIo.BytesWritable(value)) + PrimitiveCategory.BINARY, new hadoopIo.BytesWritable(value)) case Literal(value: java.sql.Date, DateType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.dateTypeInfo, new hiveIo.DateWritable(value)) + PrimitiveCategory.DATE, new hiveIo.DateWritable(value)) case Literal(value: java.sql.Timestamp, TimestampType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.timestampTypeInfo, new hiveIo.TimestampWritable(value)) + PrimitiveCategory.TIMESTAMP, new hiveIo.TimestampWritable(value)) case Literal(value: BigDecimal, DecimalType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.decimalTypeInfo, + PrimitiveCategory.DECIMAL, new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) case Literal(_, NullType) => PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.voidTypeInfo, null) + PrimitiveCategory.VOID, null) case Literal(value: Seq[_], ArrayType(dt, _)) => val listObjectInspector = toInspector(dt) val list = new java.util.ArrayList[Object]() From bcacfd7e6e97f78e7f1e782a8787a53592e70127 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 28 Oct 2014 13:59:05 +0800 Subject: [PATCH 8/8] Shim for both Hive 0.12 & 0.13.1 --- .../spark/sql/hive/HiveInspectors.scala | 82 +++---------------- .../org/apache/spark/sql/hive/Shim12.scala | 57 +++++++++++++ .../org/apache/spark/sql/hive/Shim13.scala | 64 ++++++++++++++- 3 files changed, 132 insertions(+), 71 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index e789bc2df5de5..c6103a124df59 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -204,88 +204,32 @@ private[hive] trait HiveInspectors { } def toInspector(expr: Expression): ObjectInspector = expr match { - // TODO this is how we used it in Hive 0.13 -// case Literal(value: String, StringType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.stringTypeInfo, new hadoopIo.Text(value)) -// case Literal(value: Int, IntegerType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.intTypeInfo, new hadoopIo.IntWritable(value)) -// case Literal(value: Double, DoubleType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.doubleTypeInfo, new hiveIo.DoubleWritable(value)) -// case Literal(value: Boolean, BooleanType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.booleanTypeInfo, new hadoopIo.BooleanWritable(value)) -// case Literal(value: Long, LongType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.longTypeInfo, new hadoopIo.LongWritable(value)) -// case Literal(value: Float, FloatType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.floatTypeInfo, new hadoopIo.FloatWritable(value)) -// case Literal(value: Short, ShortType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.shortTypeInfo, new hiveIo.ShortWritable(value)) -// case Literal(value: Byte, ByteType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.byteTypeInfo, new hiveIo.ByteWritable(value)) -// case Literal(value: Array[Byte], BinaryType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.binaryTypeInfo, new hadoopIo.BytesWritable(value)) -// case Literal(value: java.sql.Date, DateType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.dateTypeInfo, new hiveIo.DateWritable(value)) -// case Literal(value: java.sql.Timestamp, TimestampType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.timestampTypeInfo, new hiveIo.TimestampWritable(value)) -// case Literal(value: BigDecimal, DecimalType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.decimalTypeInfo, -// new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) -// case Literal(_, NullType) => -// PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( -// TypeInfoFactory.voidTypeInfo, null) - // the following code is for Hive 0.12 case Literal(value: String, StringType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.STRING, new hadoopIo.Text(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Int, IntegerType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.INT, new hadoopIo.IntWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Double, DoubleType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.DOUBLE, new hiveIo.DoubleWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Boolean, BooleanType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.BOOLEAN, new hadoopIo.BooleanWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Long, LongType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.LONG, new hadoopIo.LongWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Float, FloatType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.FLOAT, new hadoopIo.FloatWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Short, ShortType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.SHORT, new hiveIo.ShortWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Byte, ByteType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.BYTE, new hiveIo.ByteWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: Array[Byte], BinaryType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.BINARY, new hadoopIo.BytesWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: java.sql.Date, DateType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.DATE, new hiveIo.DateWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: java.sql.Timestamp, TimestampType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.TIMESTAMP, new hiveIo.TimestampWritable(value)) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(value: BigDecimal, DecimalType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.DECIMAL, - new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) + HiveShim.getPrimitiveWritableConstantObjectInspector(value) case Literal(_, NullType) => - PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - PrimitiveCategory.VOID, null) + HiveShim.getPrimitiveNullWritableConstantObjectInspector case Literal(value: Seq[_], ArrayType(dt, _)) => val listObjectInspector = toInspector(dt) val list = new java.util.ArrayList[Object]() diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala index 8cb81db8a9724..afc252ac27987 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala @@ -29,7 +29,11 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.stats.StatsSetupConst +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils} +import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} import org.apache.hadoop.mapred.InputFormat import scala.collection.JavaConversions._ @@ -50,6 +54,59 @@ private[hive] object HiveShim { new TableDesc(serdeClass, inputFormatClass, outputFormatClass, properties) } + def getPrimitiveWritableConstantObjectInspector(value: String): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.STRING, new hadoopIo.Text(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Int): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.INT, new hadoopIo.IntWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Double): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.DOUBLE, new hiveIo.DoubleWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Boolean): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.BOOLEAN, new hadoopIo.BooleanWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Long): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.LONG, new hadoopIo.LongWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Float): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.FLOAT, new hadoopIo.FloatWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Short): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.SHORT, new hiveIo.ShortWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Byte): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.BYTE, new hiveIo.ByteWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Array[Byte]): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.BINARY, new hadoopIo.BytesWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: java.sql.Date): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.DATE, new hiveIo.DateWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: java.sql.Timestamp): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.TIMESTAMP, new hiveIo.TimestampWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: BigDecimal): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.DECIMAL, + new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) + + def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + PrimitiveCategory.VOID, null) + def createDriverResultsArray = new JArrayList[String] def processResults(results: JArrayList[String]) = results diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index b9a742cc6ef67..42cd65b2518c9 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -21,6 +21,7 @@ import java.util.{ArrayList => JArrayList} import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.`type`.{HiveDecimal} import org.apache.hadoop.hive.conf.HiveConf @@ -28,10 +29,16 @@ import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Table, Hive, Partition} import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer} -import org.apache.hadoop.mapred.InputFormat -import org.apache.spark.Logging +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector +import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils} +import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} +import org.apache.spark.Logging + import scala.collection.JavaConversions._ import scala.language.implicitConversions @@ -54,6 +61,59 @@ private[hive] object HiveShim { new TableDesc(inputFormatClass, outputFormatClass, properties) } + def getPrimitiveWritableConstantObjectInspector(value: String): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.stringTypeInfo, new hadoopIo.Text(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Int): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.intTypeInfo, new hadoopIo.IntWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Double): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.doubleTypeInfo, new hiveIo.DoubleWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Boolean): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.booleanTypeInfo, new hadoopIo.BooleanWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Long): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.longTypeInfo, new hadoopIo.LongWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Float): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.floatTypeInfo, new hadoopIo.FloatWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Short): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.shortTypeInfo, new hiveIo.ShortWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Byte): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.byteTypeInfo, new hiveIo.ByteWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: Array[Byte]): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.binaryTypeInfo, new hadoopIo.BytesWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: java.sql.Date): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.dateTypeInfo, new hiveIo.DateWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: java.sql.Timestamp): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.timestampTypeInfo, new hiveIo.TimestampWritable(value)) + + def getPrimitiveWritableConstantObjectInspector(value: BigDecimal): ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.decimalTypeInfo, + new hiveIo.HiveDecimalWritable(HiveShim.createDecimal(value.underlying()))) + + def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.voidTypeInfo, null) + def createDriverResultsArray = new JArrayList[Object] def processResults(results: JArrayList[Object]) = {