@@ -263,24 +263,25 @@ trait FileSourceAggregatePushDownSuite
263263
264264 test(" aggregate with partition group by can be pushed down" ) {
265265 withTempPath { dir =>
266- spark.range(10 ).selectExpr(" id" , " id % 3 as p " )
266+ spark.range(10 ).selectExpr(" id" , " id % 3 as P " )
267267 .write.partitionBy(" p" ).format(format).save(dir.getCanonicalPath)
268268 withTempView(" tmp" ) {
269269 spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView(" tmp" );
270+ val query = " SELECT count(*), count(id), p, max(id), p, count(p), max(id)," +
271+ " min(id), p FROM tmp group by p"
272+ val expected = sql(query).collect
270273 Seq (" false" , " true" ).foreach { enableVectorizedReader =>
271274 withSQLConf(aggPushDownEnabledKey -> " true" ,
272275 vectorizedReaderEnabledKey -> enableVectorizedReader) {
273- val df = sql(" SELECT count(*), count(id), p, max(id), p, count(p), max(id)," +
274- " min(id), p FROM tmp group by p" )
276+ val df = sql(query)
275277 df.queryExecution.optimizedPlan.collect {
276278 case _ : DataSourceV2ScanRelation =>
277279 val expected_plan_fragment =
278280 " PushedAggregation: [COUNT(*), COUNT(id), MAX(id), COUNT(p), MIN(id)], " +
279281 " PushedFilters: [], PushedGroupBy: [p]"
280282 checkKeywordsExistsInExplain(df, expected_plan_fragment)
281283 }
282- checkAnswer(df, Seq (Row (3 , 3 , 1 , 7 , 1 , 3 , 7 , 1 , 1 ), Row (3 , 3 , 2 , 8 , 2 , 3 , 8 , 2 , 2 ),
283- Row (4 , 4 , 0 , 9 , 0 , 4 , 9 , 0 , 0 )))
284+ checkAnswer(df, expected)
284285 }
285286 }
286287 }
@@ -297,23 +298,24 @@ trait FileSourceAggregatePushDownSuite
297298 .partitionBy(" p2" , " p1" , " p4" , " p3" )
298299 .format(format)
299300 .save(dir.getCanonicalPath)
301+
300302 withTempView(" tmp" ) {
301- spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView(" tmp" );
303+ spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView(" tmp" )
304+ val query = " SELECT count(*), count(value), max(value), min(value)," +
305+ " p4, p2, p3, p1 FROM tmp GROUP BY p1, p2, p3, p4"
306+ val expected = sql(query).collect
302307 Seq (" false" , " true" ).foreach { enableVectorizedReader =>
303308 withSQLConf(aggPushDownEnabledKey -> " true" ,
304309 vectorizedReaderEnabledKey -> enableVectorizedReader) {
305- val df = sql(" SELECT count(*), count(value), max(value), min(value)," +
306- " p4, p2, p3, p1 FROM tmp GROUP BY p1, p2, p3, p4" )
310+ val df = sql(query)
307311 df.queryExecution.optimizedPlan.collect {
308312 case _ : DataSourceV2ScanRelation =>
309313 val expected_plan_fragment =
310314 " PushedAggregation: [COUNT(*), COUNT(value), MAX(value), MIN(value)]," +
311315 " PushedFilters: [], PushedGroupBy: [p1, p2, p3, p4]"
312316 checkKeywordsExistsInExplain(df, expected_plan_fragment)
313317 }
314- checkAnswer(df, Seq (Row (1 , 1 , 5 , 5 , 8 , 1 , 5 , 2 ), Row (1 , 1 , 4 , 4 , 9 , 1 , 4 , 2 ),
315- Row (2 , 2 , 6 , 3 , 8 , 1 , 4 , 2 ), Row (4 , 4 , 10 , 1 , 6 , 2 , 5 , 1 ),
316- Row (3 , 3 , 6 , - 4 , 10 , 2 , 9 , 2 )))
318+ checkAnswer(df, expected)
317319 }
318320 }
319321 }
0 commit comments