From 731a4ac810da8cd3f2a6b70a320e2a4c3ee78fa3 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Sun, 24 Mar 2019 16:07:35 -0700 Subject: [PATCH 1/2] [SPARK-24935][SQL] fix Hive UDAF with two aggregation buffers ## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close https://github.com/apache/spark/pull/23778 ## How was this patch tested? a new test Closes #24144 from cloud-fan/hive. Lead-authored-by: pgandhi Co-authored-by: Wenchen Fan Signed-off-by: gatorsmile --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 64 +++++++---- .../sql/hive/execution/HiveUDAFSuite.scala | 106 ++++++++++++++++++ 2 files changed, 147 insertions(+), 23 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 4a8450901e3a7..8ece4b5caef6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -352,12 +352,14 @@ private[hive] case class HiveUDAFFunction( HiveEvaluator(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) } - // The UDAF evaluator used to merge partial aggregation results. + // The UDAF evaluator used to consume partial aggregation results and produce final results. + // Hive `ObjectInspector` used to inspect final results. @transient - private lazy val partial2ModeEvaluator = { + private lazy val finalHiveEvaluator = { val evaluator = newEvaluator() - evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partial1HiveEvaluator.objectInspector)) - evaluator + HiveEvaluator( + evaluator, + evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(partial1HiveEvaluator.objectInspector))) } // Spark SQL data type of partial aggregation results @@ -365,16 +367,6 @@ private[hive] case class HiveUDAFFunction( private lazy val partialResultDataType = inspectorToDataType(partial1HiveEvaluator.objectInspector) - // The UDAF evaluator used to compute the final result from a partial aggregation result objects. - // Hive `ObjectInspector` used to inspect the final aggregation result object. - @transient - private lazy val finalHiveEvaluator = { - val evaluator = newEvaluator() - HiveEvaluator( - evaluator, - evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(partial1HiveEvaluator.objectInspector))) - } - // Wrapper functions used to wrap Spark SQL input arguments into Hive specific format. @transient private lazy val inputWrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray @@ -401,25 +393,43 @@ private[hive] case class HiveUDAFFunction( s"$name($distinct${children.map(_.sql).mkString(", ")})" } - override def createAggregationBuffer(): AggregationBuffer = - partial1HiveEvaluator.evaluator.getNewAggregationBuffer + // The hive UDAF may create different buffers to handle different inputs: original data or + // aggregate buffer. However, the Spark UDAF framework does not expose this information when + // creating the buffer. Here we return null, and create the buffer in `update` and `merge` + // on demand, so that we can know what input we are dealing with. + override def createAggregationBuffer(): AggregationBuffer = null @transient private lazy val inputProjection = UnsafeProjection.create(children) override def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer = { + // The input is original data, we create buffer with the partial1 evaluator. + val nonNullBuffer = if (buffer == null) { + partial1HiveEvaluator.evaluator.getNewAggregationBuffer + } else { + buffer + } + partial1HiveEvaluator.evaluator.iterate( - buffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) - buffer + nonNullBuffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) + nonNullBuffer } override def merge(buffer: AggregationBuffer, input: AggregationBuffer): AggregationBuffer = { + // The input is aggregate buffer, we create buffer with the final evaluator. + val nonNullBuffer = if (buffer == null) { + finalHiveEvaluator.evaluator.getNewAggregationBuffer + } else { + buffer + } + // The 2nd argument of the Hive `GenericUDAFEvaluator.merge()` method is an input aggregation // buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts // this `AggregationBuffer`s into this format before shuffling partial aggregation results, and // calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion. - partial2ModeEvaluator.merge(buffer, partial1HiveEvaluator.evaluator.terminatePartial(input)) - buffer + finalHiveEvaluator.evaluator.merge( + nonNullBuffer, partial1HiveEvaluator.evaluator.terminatePartial(input)) + nonNullBuffer } override def eval(buffer: AggregationBuffer): Any = { @@ -450,11 +460,19 @@ private[hive] case class HiveUDAFFunction( private val mutableRow = new GenericInternalRow(1) def serialize(buffer: AggregationBuffer): Array[Byte] = { + // The buffer may be null if there is no input. It's unclear if the hive UDAF accepts null + // buffer, for safety we create an empty buffer here. + val nonNullBuffer = if (buffer == null) { + partial1HiveEvaluator.evaluator.getNewAggregationBuffer + } else { + buffer + } + // `GenericUDAFEvaluator.terminatePartial()` converts an `AggregationBuffer` into an object // that can be inspected by the `ObjectInspector` returned by `GenericUDAFEvaluator.init()`. // Then we can unwrap it to a Spark SQL value. mutableRow.update(0, partialResultUnwrapper( - partial1HiveEvaluator.evaluator.terminatePartial(buffer))) + partial1HiveEvaluator.evaluator.terminatePartial(nonNullBuffer))) val unsafeRow = projection(mutableRow) val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes) unsafeRow.writeTo(bytes) @@ -466,11 +484,11 @@ private[hive] case class HiveUDAFFunction( // returned by `GenericUDAFEvaluator.terminatePartial()` back to an `AggregationBuffer`. The // workaround here is creating an initial `AggregationBuffer` first and then merge the // deserialized object into the buffer. - val buffer = partial2ModeEvaluator.getNewAggregationBuffer + val buffer = finalHiveEvaluator.evaluator.getNewAggregationBuffer val unsafeRow = new UnsafeRow(1) unsafeRow.pointTo(bytes, bytes.length) val partialResult = unsafeRow.get(0, partialResultDataType) - partial2ModeEvaluator.merge(buffer, partialResultWrapper(partialResult)) + finalHiveEvaluator.evaluator.merge(buffer, partialResultWrapper(partialResult)) buffer } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala index 7402c9626873c..ed82dbd4b4efb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo import test.org.apache.spark.sql.MyDoubleAvg +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -39,6 +40,7 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { protected override def beforeAll(): Unit = { sql(s"CREATE TEMPORARY FUNCTION mock AS '${classOf[MockUDAF].getName}'") sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'") + sql(s"CREATE TEMPORARY FUNCTION mock2 AS '${classOf[MockUDAF2].getName}'") Seq( (0: Integer) -> "val_0", @@ -91,6 +93,23 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { )) } + test("customized Hive UDAF with two aggregation buffers") { + val df = sql("SELECT key % 2, mock2(value) FROM t GROUP BY key % 2") + + val aggs = df.queryExecution.executedPlan.collect { + case agg: ObjectHashAggregateExec => agg + } + + // There should be two aggregate operators, one for partial aggregation, and the other for + // global aggregation. + assert(aggs.length == 2) + + checkAnswer(df, Seq( + Row(0, Row(1, 1)), + Row(1, Row(1, 1)) + )) + } + test("call JAVA UDAF") { withTempView("temp") { withUserDefinedFunction("myDoubleAvg" -> false) { @@ -126,12 +145,22 @@ class MockUDAF extends AbstractGenericUDAFResolver { override def getEvaluator(info: Array[TypeInfo]): GenericUDAFEvaluator = new MockUDAFEvaluator } +class MockUDAF2 extends AbstractGenericUDAFResolver { + override def getEvaluator(info: Array[TypeInfo]): GenericUDAFEvaluator = new MockUDAFEvaluator2 +} + class MockUDAFBuffer(var nonNullCount: Long, var nullCount: Long) extends GenericUDAFEvaluator.AbstractAggregationBuffer { override def estimate(): Int = JavaDataModel.PRIMITIVES2 * 2 } +class MockUDAFBuffer2(var nonNullCount: Long, var nullCount: Long) + extends GenericUDAFEvaluator.AbstractAggregationBuffer { + + override def estimate(): Int = JavaDataModel.PRIMITIVES2 * 2 +} + class MockUDAFEvaluator extends GenericUDAFEvaluator { private val nonNullCountOI = PrimitiveObjectInspectorFactory.javaLongObjectInspector @@ -183,3 +212,80 @@ class MockUDAFEvaluator extends GenericUDAFEvaluator { override def terminate(agg: AggregationBuffer): AnyRef = terminatePartial(agg) } + +// Same as MockUDAFEvaluator but using two aggregation buffers, one for PARTIAL1 and the other +// for PARTIAL2. +class MockUDAFEvaluator2 extends GenericUDAFEvaluator { + private val nonNullCountOI = PrimitiveObjectInspectorFactory.javaLongObjectInspector + + private val nullCountOI = PrimitiveObjectInspectorFactory.javaLongObjectInspector + private var aggMode: Mode = null + + private val bufferOI = { + val fieldNames = Seq("nonNullCount", "nullCount").asJava + val fieldOIs = Seq(nonNullCountOI: ObjectInspector, nullCountOI: ObjectInspector).asJava + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs) + } + + private val nonNullCountField = bufferOI.getStructFieldRef("nonNullCount") + + private val nullCountField = bufferOI.getStructFieldRef("nullCount") + + override def getNewAggregationBuffer: AggregationBuffer = { + // These 2 modes consume original data. + if (aggMode == Mode.PARTIAL1 || aggMode == Mode.COMPLETE) { + new MockUDAFBuffer(0L, 0L) + } else { + new MockUDAFBuffer2(0L, 0L) + } + } + + override def reset(agg: AggregationBuffer): Unit = { + val buffer = agg.asInstanceOf[MockUDAFBuffer] + buffer.nonNullCount = 0L + buffer.nullCount = 0L + } + + override def init(mode: Mode, parameters: Array[ObjectInspector]): ObjectInspector = { + aggMode = mode + bufferOI + } + + override def iterate(agg: AggregationBuffer, parameters: Array[AnyRef]): Unit = { + val buffer = agg.asInstanceOf[MockUDAFBuffer] + if (parameters.head eq null) { + buffer.nullCount += 1L + } else { + buffer.nonNullCount += 1L + } + } + + override def merge(agg: AggregationBuffer, partial: Object): Unit = { + if (partial ne null) { + val nonNullCount = nonNullCountOI.get(bufferOI.getStructFieldData(partial, nonNullCountField)) + val nullCount = nullCountOI.get(bufferOI.getStructFieldData(partial, nullCountField)) + val buffer = agg.asInstanceOf[MockUDAFBuffer2] + buffer.nonNullCount += nonNullCount + buffer.nullCount += nullCount + } + } + + // As this method is called for both states, Partial1 and Partial2, the hack in the method + // to check for class of aggregation buffer was necessary. + override def terminatePartial(agg: AggregationBuffer): AnyRef = { + var result: AnyRef = null + if (agg.getClass.toString.contains("MockUDAFBuffer2")) { + val buffer = agg.asInstanceOf[MockUDAFBuffer2] + result = Array[Object](buffer.nonNullCount: java.lang.Long, buffer.nullCount: java.lang.Long) + } else { + val buffer = agg.asInstanceOf[MockUDAFBuffer] + result = Array[Object](buffer.nonNullCount: java.lang.Long, buffer.nullCount: java.lang.Long) + } + result + } + + override def terminate(agg: AggregationBuffer): AnyRef = { + val buffer = agg.asInstanceOf[MockUDAFBuffer2] + Array[Object](buffer.nonNullCount: java.lang.Long, buffer.nullCount: java.lang.Long) + } +} From f16ff367b85f03c41481d48fd1ebb8c2c514d776 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Apr 2019 10:35:23 +0800 Subject: [PATCH 2/2] [SPARK-24935][SQL][FOLLOWUP] support INIT -> UPDATE -> MERGE -> FINISH in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes #24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 54 +++++++++++++------ .../sql/hive/execution/HiveUDAFSuite.scala | 38 ++++++++----- 2 files changed, 64 insertions(+), 28 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 8ece4b5caef6e..d8d9e97282bf8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -303,6 +303,13 @@ private[hive] case class HiveGenericUDTF( * - `wrap()`/`wrapperFor()`: from 3 to 1 * - `unwrap()`/`unwrapperFor()`: from 1 to 3 * - `GenericUDAFEvaluator.terminatePartial()`: from 2 to 3 + * + * Note that, Hive UDAF is initialized with aggregate mode, and some specific Hive UDAFs can't + * mix UPDATE and MERGE actions during its life cycle. However, Spark may do UPDATE on a UDAF and + * then do MERGE, in case of hash aggregate falling back to sort aggregate. To work around this + * issue, we track the ability to do MERGE in the Hive UDAF aggregate buffer. If Spark does + * UPDATE then MERGE, we can detect it and re-create the aggregate buffer with a different + * aggregate mode. */ private[hive] case class HiveUDAFFunction( name: String, @@ -311,7 +318,7 @@ private[hive] case class HiveUDAFFunction( isUDAFBridgeRequired: Boolean = false, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) - extends TypedImperativeAggregate[GenericUDAFEvaluator.AggregationBuffer] + extends TypedImperativeAggregate[HiveUDAFBuffer] with HiveInspectors with UserDefinedExpression { @@ -397,55 +404,70 @@ private[hive] case class HiveUDAFFunction( // aggregate buffer. However, the Spark UDAF framework does not expose this information when // creating the buffer. Here we return null, and create the buffer in `update` and `merge` // on demand, so that we can know what input we are dealing with. - override def createAggregationBuffer(): AggregationBuffer = null + override def createAggregationBuffer(): HiveUDAFBuffer = null @transient private lazy val inputProjection = UnsafeProjection.create(children) - override def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer = { + override def update(buffer: HiveUDAFBuffer, input: InternalRow): HiveUDAFBuffer = { // The input is original data, we create buffer with the partial1 evaluator. val nonNullBuffer = if (buffer == null) { - partial1HiveEvaluator.evaluator.getNewAggregationBuffer + HiveUDAFBuffer(partial1HiveEvaluator.evaluator.getNewAggregationBuffer, false) } else { buffer } + assert(!nonNullBuffer.canDoMerge, "can not call `merge` then `update` on a Hive UDAF.") + partial1HiveEvaluator.evaluator.iterate( - nonNullBuffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) + nonNullBuffer.buf, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) nonNullBuffer } - override def merge(buffer: AggregationBuffer, input: AggregationBuffer): AggregationBuffer = { + override def merge(buffer: HiveUDAFBuffer, input: HiveUDAFBuffer): HiveUDAFBuffer = { // The input is aggregate buffer, we create buffer with the final evaluator. val nonNullBuffer = if (buffer == null) { - finalHiveEvaluator.evaluator.getNewAggregationBuffer + HiveUDAFBuffer(finalHiveEvaluator.evaluator.getNewAggregationBuffer, true) } else { buffer } + // It's possible that we've called `update` of this Hive UDAF, and some specific Hive UDAF + // implementation can't mix the `update` and `merge` calls during its life cycle. To work + // around it, here we create a fresh buffer with final evaluator, and merge the existing buffer + // to it, and replace the existing buffer with it. + val mergeableBuf = if (!nonNullBuffer.canDoMerge) { + val newBuf = finalHiveEvaluator.evaluator.getNewAggregationBuffer + finalHiveEvaluator.evaluator.merge( + newBuf, partial1HiveEvaluator.evaluator.terminatePartial(nonNullBuffer.buf)) + HiveUDAFBuffer(newBuf, true) + } else { + nonNullBuffer + } + // The 2nd argument of the Hive `GenericUDAFEvaluator.merge()` method is an input aggregation // buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts // this `AggregationBuffer`s into this format before shuffling partial aggregation results, and // calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion. finalHiveEvaluator.evaluator.merge( - nonNullBuffer, partial1HiveEvaluator.evaluator.terminatePartial(input)) - nonNullBuffer + mergeableBuf.buf, partial1HiveEvaluator.evaluator.terminatePartial(input.buf)) + mergeableBuf } - override def eval(buffer: AggregationBuffer): Any = { - resultUnwrapper(finalHiveEvaluator.evaluator.terminate(buffer)) + override def eval(buffer: HiveUDAFBuffer): Any = { + resultUnwrapper(finalHiveEvaluator.evaluator.terminate(buffer.buf)) } - override def serialize(buffer: AggregationBuffer): Array[Byte] = { + override def serialize(buffer: HiveUDAFBuffer): Array[Byte] = { // Serializes an `AggregationBuffer` that holds partial aggregation results so that we can // shuffle it for global aggregation later. - aggBufferSerDe.serialize(buffer) + aggBufferSerDe.serialize(buffer.buf) } - override def deserialize(bytes: Array[Byte]): AggregationBuffer = { + override def deserialize(bytes: Array[Byte]): HiveUDAFBuffer = { // Deserializes an `AggregationBuffer` from the shuffled partial aggregation phase to prepare // for global aggregation by merging multiple partial aggregation results within a single group. - aggBufferSerDe.deserialize(bytes) + HiveUDAFBuffer(aggBufferSerDe.deserialize(bytes), false) } // Helper class used to de/serialize Hive UDAF `AggregationBuffer` objects @@ -493,3 +515,5 @@ private[hive] case class HiveUDAFFunction( } } } + +case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala index ed82dbd4b4efb..27aff263d35bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala @@ -28,10 +28,10 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo import test.org.apache.spark.sql.MyDoubleAvg -import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { @@ -93,21 +93,33 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { )) } - test("customized Hive UDAF with two aggregation buffers") { - val df = sql("SELECT key % 2, mock2(value) FROM t GROUP BY key % 2") + test("SPARK-24935: customized Hive UDAF with two aggregation buffers") { + withTempView("v") { + spark.range(100).createTempView("v") + val df = sql("SELECT id % 2, mock2(id) FROM v GROUP BY id % 2") - val aggs = df.queryExecution.executedPlan.collect { - case agg: ObjectHashAggregateExec => agg - } + val aggs = df.queryExecution.executedPlan.collect { + case agg: ObjectHashAggregateExec => agg + } - // There should be two aggregate operators, one for partial aggregation, and the other for - // global aggregation. - assert(aggs.length == 2) + // There should be two aggregate operators, one for partial aggregation, and the other for + // global aggregation. + assert(aggs.length == 2) - checkAnswer(df, Seq( - Row(0, Row(1, 1)), - Row(1, Row(1, 1)) - )) + withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "1") { + checkAnswer(df, Seq( + Row(0, Row(50, 0)), + Row(1, Row(50, 0)) + )) + } + + withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "100") { + checkAnswer(df, Seq( + Row(0, Row(50, 0)), + Row(1, Row(50, 0)) + )) + } + } } test("call JAVA UDAF") {