@@ -21,9 +21,6 @@ import java.math.{BigDecimal => JBigDecimal}
2121import java .nio .charset .StandardCharsets
2222import java .sql .{Date , Timestamp }
2323
24- import scala .reflect .ClassTag
25- import scala .reflect .runtime .universe .TypeTag
26-
2724import org .apache .parquet .filter2 .predicate .{FilterApi , FilterPredicate , Operators }
2825import org .apache .parquet .filter2 .predicate .FilterApi ._
2926import org .apache .parquet .filter2 .predicate .Operators .{Column => _ , _ }
@@ -45,7 +42,6 @@ import org.apache.spark.sql.test.SharedSparkSession
4542import org .apache .spark .sql .types ._
4643import org .apache .spark .util .{AccumulatorContext , AccumulatorV2 }
4744
48-
4945/**
5046 * A test suite that tests Parquet filter2 API based filter pushdown optimization.
5147 *
@@ -112,7 +108,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
112108 * dataframes as new test data.
113109 */
114110 private def withNestedDataFrame (inputDF : DataFrame )
115- (runTests : (DataFrame , String , Any => Any ) => Unit ): Unit = {
111+ (runTest : (DataFrame , String , Any => Any ) => Unit ): Unit = {
116112 assert(inputDF.schema.fields.length == 1 )
117113 assert(! inputDF.schema.fields.head.dataType.isInstanceOf [StructType ])
118114 val df = inputDF.toDF(" temp" )
@@ -140,8 +136,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
140136 " `a.b`.`c.d`" , // one level nesting with column names containing `dots`
141137 (x : Any ) => Row (x)
142138 )
143- ).foreach { case (df, pushDownColName, resultTransFun ) =>
144- runTests (df, pushDownColName, resultTransFun )
139+ ).foreach { case (df, colName, resultFun ) =>
140+ runTest (df, colName, resultFun )
145141 }
146142 }
147143
@@ -153,10 +149,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
153149 val ts4 = data(3 )
154150
155151 import testImplicits ._
156- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
152+ withNestedDataFrame(data.map(i => Tuple1 (i)). toDF()) { case (inputDF, colName , resultFun) =>
157153 withParquetDataFrame(inputDF) { implicit df =>
158- val tsAttr = df(pushDownColName ).expr
159- assert(df(pushDownColName ).expr.dataType === TimestampType )
154+ val tsAttr = df(colName ).expr
155+ assert(df(colName ).expr.dataType === TimestampType )
160156
161157 checkFilterPredicate(tsAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
162158 checkFilterPredicate(tsAttr.isNotNull, classOf [NotEq [_]],
@@ -213,12 +209,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
213209 }
214210
215211 test(" filter pushdown - boolean" ) {
216- val data = true :: false :: Nil
212+ val data = ( true :: false :: Nil ).map(b => Tuple1 .apply( Option (b)))
217213 import testImplicits ._
218- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
214+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
219215 withParquetDataFrame(inputDF) { implicit df =>
220- val booleanAttr = df(pushDownColName ).expr
221- assert(df(pushDownColName ).expr.dataType === BooleanType )
216+ val booleanAttr = df(colName ).expr
217+ assert(df(colName ).expr.dataType === BooleanType )
222218
223219 checkFilterPredicate(booleanAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
224220 checkFilterPredicate(booleanAttr.isNotNull, classOf [NotEq [_]],
@@ -234,10 +230,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
234230 test(" filter pushdown - tinyint" ) {
235231 val data = (1 to 4 ).map(i => Tuple1 (Option (i.toByte)))
236232 import testImplicits ._
237- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
233+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
238234 withParquetDataFrame(inputDF) { implicit df =>
239- val tinyIntAttr = df(pushDownColName ).expr
240- assert(df(pushDownColName ).expr.dataType === ByteType )
235+ val tinyIntAttr = df(colName ).expr
236+ assert(df(colName ).expr.dataType === ByteType )
241237
242238 checkFilterPredicate(tinyIntAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
243239 checkFilterPredicate(tinyIntAttr.isNotNull, classOf [NotEq [_]],
@@ -270,10 +266,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
270266 test(" filter pushdown - smallint" ) {
271267 val data = (1 to 4 ).map(i => Tuple1 (Option (i.toShort)))
272268 import testImplicits ._
273- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
269+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
274270 withParquetDataFrame(inputDF) { implicit df =>
275- val smallIntAttr = df(pushDownColName ).expr
276- assert(df(pushDownColName ).expr.dataType === ShortType )
271+ val smallIntAttr = df(colName ).expr
272+ assert(df(colName ).expr.dataType === ShortType )
277273
278274 checkFilterPredicate(smallIntAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
279275 checkFilterPredicate(smallIntAttr.isNotNull, classOf [NotEq [_]],
@@ -306,10 +302,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
306302 test(" filter pushdown - integer" ) {
307303 val data = (1 to 4 ).map(i => Tuple1 (Option (i)))
308304 import testImplicits ._
309- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
305+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
310306 withParquetDataFrame(inputDF) { implicit df =>
311- val intAttr = df(pushDownColName ).expr
312- assert(df(pushDownColName ).expr.dataType === IntegerType )
307+ val intAttr = df(colName ).expr
308+ assert(df(colName ).expr.dataType === IntegerType )
313309
314310 checkFilterPredicate(intAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
315311 checkFilterPredicate(intAttr.isNotNull, classOf [NotEq [_]],
@@ -342,10 +338,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
342338 test(" filter pushdown - long" ) {
343339 val data = (1 to 4 ).map(i => Tuple1 (Option (i.toLong)))
344340 import testImplicits ._
345- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
341+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
346342 withParquetDataFrame(inputDF) { implicit df =>
347- val longAttr = df(pushDownColName ).expr
348- assert(df(pushDownColName ).expr.dataType === LongType )
343+ val longAttr = df(colName ).expr
344+ assert(df(colName ).expr.dataType === LongType )
349345
350346 checkFilterPredicate(longAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
351347 checkFilterPredicate(longAttr.isNotNull, classOf [NotEq [_]],
@@ -378,10 +374,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
378374 test(" filter pushdown - float" ) {
379375 val data = (1 to 4 ).map(i => Tuple1 (Option (i.toFloat)))
380376 import testImplicits ._
381- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
377+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
382378 withParquetDataFrame(inputDF) { implicit df =>
383- val floatAttr = df(pushDownColName ).expr
384- assert(df(pushDownColName ).expr.dataType === FloatType )
379+ val floatAttr = df(colName ).expr
380+ assert(df(colName ).expr.dataType === FloatType )
385381
386382 checkFilterPredicate(floatAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
387383 checkFilterPredicate(floatAttr.isNotNull, classOf [NotEq [_]],
@@ -414,10 +410,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
414410 test(" filter pushdown - double" ) {
415411 val data = (1 to 4 ).map(i => Tuple1 (Option (i.toDouble)))
416412 import testImplicits ._
417- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
413+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
418414 withParquetDataFrame(inputDF) { implicit df =>
419- val doubleAttr = df(pushDownColName ).expr
420- assert(df(pushDownColName ).expr.dataType === DoubleType )
415+ val doubleAttr = df(colName ).expr
416+ assert(df(colName ).expr.dataType === DoubleType )
421417
422418 checkFilterPredicate(doubleAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
423419 checkFilterPredicate(doubleAttr.isNotNull, classOf [NotEq [_]],
@@ -450,10 +446,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
450446 test(" filter pushdown - string" ) {
451447 val data = (1 to 4 ).map(i => Tuple1 (Option (i.toString)))
452448 import testImplicits ._
453- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
449+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
454450 withParquetDataFrame(inputDF) { implicit df =>
455- val stringAttr = df(pushDownColName ).expr
456- assert(df(pushDownColName ).expr.dataType === StringType )
451+ val stringAttr = df(colName ).expr
452+ assert(df(colName ).expr.dataType === StringType )
457453
458454 checkFilterPredicate(stringAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
459455 checkFilterPredicate(stringAttr.isNotNull, classOf [NotEq [_]],
@@ -490,10 +486,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
490486
491487 val data = (1 to 4 ).map(i => Tuple1 (Option (i.b)))
492488 import testImplicits ._
493- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
489+ withNestedDataFrame(data.toDF()) { case (inputDF, colName , resultFun) =>
494490 withParquetDataFrame(inputDF) { implicit df =>
495- val binaryAttr : Expression = df(pushDownColName ).expr
496- assert(df(pushDownColName ).expr.dataType === BinaryType )
491+ val binaryAttr : Expression = df(colName ).expr
492+ assert(df(colName ).expr.dataType === BinaryType )
497493
498494 checkFilterPredicate(binaryAttr === 1 .b, classOf [Eq [_]], resultFun(1 .b))
499495 checkFilterPredicate(binaryAttr <=> 1 .b, classOf [Eq [_]], resultFun(1 .b))
@@ -531,10 +527,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
531527
532528 val data = Seq (" 2018-03-18" , " 2018-03-19" , " 2018-03-20" , " 2018-03-21" ).map(_.date)
533529 import testImplicits ._
534- withNestedDataFrame(data.toDF()) { case (inputDF, pushDownColName , resultFun) =>
530+ withNestedDataFrame(data.map(i => Tuple1 (i)). toDF()) { case (inputDF, colName , resultFun) =>
535531 withParquetDataFrame(inputDF) { implicit df =>
536- val dateAttr : Expression = df(pushDownColName ).expr
537- assert(df(pushDownColName ).expr.dataType === DateType )
532+ val dateAttr : Expression = df(colName ).expr
533+ assert(df(colName ).expr.dataType === DateType )
538534
539535 checkFilterPredicate(dateAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
540536 checkFilterPredicate(dateAttr.isNotNull, classOf [NotEq [_]],
@@ -603,7 +599,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
603599 // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
604600 withSQLConf(SQLConf .PARQUET_OUTPUT_TIMESTAMP_TYPE .key ->
605601 ParquetOutputTimestampType .INT96 .toString) {
606- withParquetDataFrame(toDF(millisData.map(i => Tuple1 (i)))) { implicit df =>
602+ import testImplicits ._
603+ withParquetDataFrame(millisData.map(i => Tuple1 (i)).toDF()) { implicit df =>
607604 val schema = new SparkToParquetSchemaConverter (conf).convert(df.schema)
608605 assertResult(None ) {
609606 createParquetFilters(schema).createFilter(sources.IsNull (" _1" ))
@@ -623,10 +620,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
623620 val rdd =
624621 spark.sparkContext.parallelize((1 to 4 ).map(i => Row (new java.math.BigDecimal (i))))
625622 val dataFrame = spark.createDataFrame(rdd, StructType .fromDDL(s " a decimal( $precision, 2) " ))
626- withNestedDataFrame(dataFrame) { case (inputDF, pushDownColName , resultFun) =>
623+ withNestedDataFrame(dataFrame) { case (inputDF, colName , resultFun) =>
627624 withParquetDataFrame(inputDF) { implicit df =>
628- val decimalAttr : Expression = df(pushDownColName ).expr
629- assert(df(pushDownColName ).expr.dataType === DecimalType (precision, 2 ))
625+ val decimalAttr : Expression = df(colName ).expr
626+ assert(df(colName ).expr.dataType === DecimalType (precision, 2 ))
630627
631628 checkFilterPredicate(decimalAttr.isNull, classOf [Eq [_]], Seq .empty[Row ])
632629 checkFilterPredicate(decimalAttr.isNotNull, classOf [NotEq [_]],
@@ -1166,7 +1163,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
11661163 }
11671164
11681165 test(" SPARK-16371 Do not push down filters when inner name and outer name are the same" ) {
1169- withParquetDataFrame(toDF((1 to 4 ).map(i => Tuple1 (Tuple1 (i))))) { implicit df =>
1166+ import testImplicits ._
1167+ withParquetDataFrame((1 to 4 ).map(i => Tuple1 (Tuple1 (i))).toDF()) { implicit df =>
11701168 // Here the schema becomes as below:
11711169 //
11721170 // root
@@ -1308,7 +1306,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
13081306 }
13091307
13101308 test(" filter pushdown - StringStartsWith" ) {
1311- withParquetDataFrame(toDF((1 to 4 ).map(i => Tuple1 (i + " str" + i)))) { implicit df =>
1309+ withParquetDataFrame {
1310+ import testImplicits ._
1311+ (1 to 4 ).map(i => Tuple1 (i + " str" + i)).toDF()
1312+ } { implicit df =>
13121313 checkFilterPredicate(
13131314 ' _1 .startsWith(" " ).asInstanceOf [Predicate ],
13141315 classOf [UserDefinedByInstance [_, _]],
@@ -1354,7 +1355,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
13541355 }
13551356
13561357 // SPARK-28371: make sure filter is null-safe.
1357- withParquetDataFrame(toDF(Seq (Tuple1 [String ](null )))) { implicit df =>
1358+ withParquetDataFrame {
1359+ import testImplicits ._
1360+ Seq (Tuple1 [String ](null )).toDF()
1361+ } { implicit df =>
13581362 checkFilterPredicate(
13591363 ' _1 .startsWith(" blah" ).asInstanceOf [Predicate ],
13601364 classOf [UserDefinedByInstance [_, _]],
0 commit comments