From 7f94affd1b1dae71aa5f71fff3f60d877f6c3a1c Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Mon, 3 Nov 2014 15:30:39 +0530 Subject: [PATCH 1/8] Added foldable support to CreateArray --- .../apache/spark/sql/catalyst/expressions/complexTypes.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index 917b346086dcb..b88ec96bc5abc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -107,7 +107,9 @@ case class GetField(child: Expression, fieldName: String) extends UnaryExpressio */ case class CreateArray(children: Seq[Expression]) extends Expression { override type EvaluatedType = Any - + + override def foldable = !children.exists(!_.foldable) + lazy val childTypes = children.map(_.dataType).distinct override lazy val resolved = From cb7c61e286b3339fbd216aac4c317c3279aefeaa Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Mon, 3 Nov 2014 15:32:55 +0530 Subject: [PATCH 2/8] Supported ConstantInspector for UDAF Fixed HiveUdaf wrap object issue. --- .../org/apache/spark/sql/hive/hiveUdfs.scala | 26 +++++++++++++------ .../sql/hive/execution/HiveUdfSuite.scala | 11 ++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index b255a2ebb9778..ec1b56f3f8841 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -198,12 +198,13 @@ private[hive] case class HiveGenericUdaf( @transient protected lazy val objectInspector = { - resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray) + val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray,false,false) + resolver.getEvaluator(parameterInfo) .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray) } @transient - protected lazy val inspectors = children.map(_.dataType).map(toInspector) + protected lazy val inspectors = children.map(toInspector) def dataType: DataType = inspectorToDataType(objectInspector) @@ -233,7 +234,7 @@ private[hive] case class HiveUdaf( } @transient - protected lazy val inspectors = children.map(_.dataType).map(toInspector) + protected lazy val inspectors = children.map(ex => toInspector(ex.dataType)) def dataType: DataType = inspectorToDataType(objectInspector) @@ -266,7 +267,7 @@ private[hive] case class HiveGenericUdtf( protected lazy val function: GenericUDTF = createFunction() @transient - protected lazy val inputInspectors = children.map(_.dataType).map(toInspector) + protected lazy val inputInspectors = children.map( ex => toInspector(ex.dataType)) @transient protected lazy val outputInspector = function.initialize(inputInspectors.toArray) @@ -341,9 +342,15 @@ private[hive] case class HiveUdafFunction( createFunction[AbstractGenericUDAFResolver]() } - private val inspectors = exprs.map(_.dataType).map(toInspector).toArray - - private val function = resolver.getEvaluator(exprs.map(_.dataType.toTypeInfo).toArray) + + private val inspectors = + if(isUDAFBridgeRequired) exprs.map(ex => toInspector(ex.dataType)).toArray + else exprs.map(toInspector).toArray + + private val function = { + val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors,false,false) + resolver.getEvaluator(parameterInfo) + } private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) @@ -356,8 +363,11 @@ private[hive] case class HiveUdafFunction( @transient val inputProjection = new InterpretedProjection(exprs) + @transient + protected lazy val cached = new Array[AnyRef](exprs.length) + def update(input: Row): Unit = { val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray - function.iterate(buffer, inputs) + function.iterate(buffer, wrap(inputs,inspectors,cached)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 872f28d514efe..2af1ff8f0fbb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -87,8 +87,19 @@ class HiveUdfSuite extends QueryTest { test("SPARK-2693 udaf aggregates test") { checkAnswer(sql("SELECT percentile(key,1) FROM src LIMIT 1"), sql("SELECT max(key) FROM src").collect().toSeq) + + checkAnswer(sql("SELECT percentile(key,array(1,1)) FROM src LIMIT 1"), + sql("SELECT array(max(key),max(key)) FROM src").collect().toSeq) } + test("Generic UDAF aggregates") { + checkAnswer(sql("SELECT ceiling(percentile_approx(key,0.99999)) FROM src LIMIT 1"), + sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) + + checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9,0.9)) FROM src LIMIT 1"), + sql("SELECT array(100,100) FROM src LIMIT 1").collect().toSeq) + } + test("UDFIntegerToString") { val testData = TestHive.sparkContext.parallelize( IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil) From 47f636550e4860073846ec11a813f7cbc4bbc769 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Mon, 3 Nov 2014 22:34:50 +0530 Subject: [PATCH 3/8] fixed test --- .../org/apache/spark/sql/hive/execution/HiveUdfSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 2af1ff8f0fbb7..1153e8a8b3ce4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -90,6 +90,8 @@ class HiveUdfSuite extends QueryTest { checkAnswer(sql("SELECT percentile(key,array(1,1)) FROM src LIMIT 1"), sql("SELECT array(max(key),max(key)) FROM src").collect().toSeq) + + TestHive.reset() } test("Generic UDAF aggregates") { @@ -98,6 +100,8 @@ class HiveUdfSuite extends QueryTest { checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9,0.9)) FROM src LIMIT 1"), sql("SELECT array(100,100) FROM src LIMIT 1").collect().toSeq) + + TestHive.reset() } test("UDFIntegerToString") { From f37fd6934467d4d02be3766db2ac8e47cf2e04b4 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Wed, 5 Nov 2014 00:35:11 +0530 Subject: [PATCH 4/8] Fixed review comments --- .../main/scala/org/apache/spark/sql/hive/hiveUdfs.scala | 7 ++----- .../org/apache/spark/sql/hive/execution/HiveUdfSuite.scala | 2 -- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index ec1b56f3f8841..588bfdbf1f402 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -341,11 +341,8 @@ private[hive] case class HiveUdafFunction( } else { createFunction[AbstractGenericUDAFResolver]() } - - private val inspectors = - if(isUDAFBridgeRequired) exprs.map(ex => toInspector(ex.dataType)).toArray - else exprs.map(toInspector).toArray + private val inspectors = exprs.map(toInspector).toArray private val function = { val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors,false,false) @@ -368,6 +365,6 @@ private[hive] case class HiveUdafFunction( def update(input: Row): Unit = { val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray - function.iterate(buffer, wrap(inputs,inspectors,cached)) + function.iterate(buffer, wrap(inputs, inspectors, cached)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 1153e8a8b3ce4..072d2f1c89566 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -100,8 +100,6 @@ class HiveUdfSuite extends QueryTest { checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9,0.9)) FROM src LIMIT 1"), sql("SELECT array(100,100) FROM src LIMIT 1").collect().toSeq) - - TestHive.reset() } test("UDFIntegerToString") { From 4d39105e28c4e966a5028e59a7ce7ba9def4fd49 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Wed, 5 Nov 2014 11:47:52 +0530 Subject: [PATCH 5/8] Unified inspector creation, style check fixes --- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 11 ++++++----- .../spark/sql/hive/execution/HiveUdfSuite.scala | 14 +++++++------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 588bfdbf1f402..68135059e2564 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -198,7 +198,7 @@ private[hive] case class HiveGenericUdaf( @transient protected lazy val objectInspector = { - val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray,false,false) + val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray, false, false) resolver.getEvaluator(parameterInfo) .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray) } @@ -229,12 +229,13 @@ private[hive] case class HiveUdaf( @transient protected lazy val objectInspector = { - resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray) + val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray, false, false) + resolver.getEvaluator(parameterInfo) .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray) } @transient - protected lazy val inspectors = children.map(ex => toInspector(ex.dataType)) + protected lazy val inspectors = children.map(toInspector) def dataType: DataType = inspectorToDataType(objectInspector) @@ -267,7 +268,7 @@ private[hive] case class HiveGenericUdtf( protected lazy val function: GenericUDTF = createFunction() @transient - protected lazy val inputInspectors = children.map( ex => toInspector(ex.dataType)) + protected lazy val inputInspectors = children.map(toInspector) @transient protected lazy val outputInspector = function.initialize(inputInspectors.toArray) @@ -345,7 +346,7 @@ private[hive] case class HiveUdafFunction( private val inspectors = exprs.map(toInspector).toArray private val function = { - val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors,false,false) + val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false) resolver.getEvaluator(parameterInfo) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 072d2f1c89566..9985f081bdde4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -85,22 +85,22 @@ class HiveUdfSuite extends QueryTest { } test("SPARK-2693 udaf aggregates test") { - checkAnswer(sql("SELECT percentile(key,1) FROM src LIMIT 1"), + checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"), sql("SELECT max(key) FROM src").collect().toSeq) - checkAnswer(sql("SELECT percentile(key,array(1,1)) FROM src LIMIT 1"), - sql("SELECT array(max(key),max(key)) FROM src").collect().toSeq) + checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"), + sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq) TestHive.reset() } test("Generic UDAF aggregates") { - checkAnswer(sql("SELECT ceiling(percentile_approx(key,0.99999)) FROM src LIMIT 1"), + checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999)) FROM src LIMIT 1"), sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) - checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9,0.9)) FROM src LIMIT 1"), - sql("SELECT array(100,100) FROM src LIMIT 1").collect().toSeq) - } + checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"), + sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq) + } test("UDFIntegerToString") { val testData = TestHive.sparkContext.parallelize( From c46db0f24ffe8e8309c93d5e166a7daaaad56923 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Sun, 23 Nov 2014 09:31:54 +0530 Subject: [PATCH 6/8] Removed TestHive reset --- .../org/apache/spark/sql/hive/execution/HiveUdfSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 9985f081bdde4..ff9385a591d6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -90,8 +90,6 @@ class HiveUdfSuite extends QueryTest { checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"), sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq) - - TestHive.reset() } test("Generic UDAF aggregates") { From a18f917734ffbc7e7527c05f5147135542ad86df Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Sun, 23 Nov 2014 09:37:21 +0530 Subject: [PATCH 7/8] avoid constant udf expression re-evaluation - fixes failure due to return iterator and value type mismatch --- .../main/scala/org/apache/spark/sql/hive/hiveUdfs.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 68135059e2564..44e30747b79cf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -164,6 +164,11 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq override def foldable = isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector] + @transient + protected lazy val constantReturnValue = unwrap( + returnInspector.asInstanceOf[ConstantObjectInspector].getWritableConstantValue(), + returnInspector) + @transient protected lazy val deferedObjects = argumentInspectors.map(new DeferredObjectAdapter(_)).toArray[DeferredObject] @@ -172,6 +177,8 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq override def eval(input: Row): Any = { returnInspector // Make sure initialized. + if(foldable) return constantReturnValue + var i = 0 while (i < children.length) { val idx = i From a0182e54c5ed97bb4b85c895434b4d626671ba58 Mon Sep 17 00:00:00 2001 From: Venkata Ramana Gollamudi Date: Tue, 9 Dec 2014 17:39:49 +0530 Subject: [PATCH 8/8] fixed review comment --- .../src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 44e30747b79cf..7fece4b20f52d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -165,7 +165,7 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector] @transient - protected lazy val constantReturnValue = unwrap( + protected def constantReturnValue = unwrap( returnInspector.asInstanceOf[ConstantObjectInspector].getWritableConstantValue(), returnInspector)