@@ -134,6 +134,19 @@ object NullResultAgg extends Aggregator[AggData, AggData, AggData] {
134134 override def outputEncoder : Encoder [AggData ] = Encoders .product[AggData ]
135135}
136136
137+ case class ComplexAggData (d1 : AggData , d2 : AggData )
138+
139+ object VeryComplexResultAgg extends Aggregator [Row , String , ComplexAggData ] {
140+ override def zero : String = " "
141+ override def reduce (buffer : String , input : Row ): String = buffer + input.getString(1 )
142+ override def merge (b1 : String , b2 : String ): String = b1 + b2
143+ override def finish (reduction : String ): ComplexAggData = {
144+ ComplexAggData (AggData (reduction.length, reduction), AggData (reduction.length, reduction))
145+ }
146+ override def bufferEncoder : Encoder [String ] = Encoders .STRING
147+ override def outputEncoder : Encoder [ComplexAggData ] = Encoders .product[ComplexAggData ]
148+ }
149+
137150
138151class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
139152 import testImplicits ._
@@ -312,4 +325,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
312325 val ds3 = sql(" SELECT 'Some String' AS b, 1279869254 AS a" ).as[AggData ]
313326 assert(ds3.select(NameAgg .toColumn).schema.head.nullable === true )
314327 }
328+
329+ test(" SPARK-18147: very complex aggregator result type" ) {
330+ val df = Seq (1 -> " a" , 2 -> " b" , 2 -> " c" ).toDF(" i" , " j" )
331+
332+ checkAnswer(
333+ df.groupBy($" i" ).agg(VeryComplexResultAgg .toColumn),
334+ Row (1 , Row (Row (1 , " a" ), Row (1 , " a" ))) :: Row (2 , Row (Row (2 , " bc" ), Row (2 , " bc" ))) :: Nil )
335+ }
315336}
0 commit comments