@@ -2271,28 +2271,27 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
22712271 }
22722272 }
22732273
2274- test(" SPARK-30530: 'is null' filter produces incorrect results" ) {
2275- withTempPath { path =>
2276- Seq (
2277- " 100.0,1.0," ,
2278- " 200.0,," ,
2279- " 300.0,3.0," ,
2280- " 1.0,4.0," ,
2281- " ,4.0," ,
2282- " 500.0,," ,
2283- " ,6.0," ,
2284- " -500.0,50.5" ).toDF(" data" )
2285- .repartition(1 )
2286- .write.text(path.getAbsolutePath)
2287-
2288- val schema = new StructType ()
2289- .add(" floats" , FloatType )
2290- .add(" more_floats" , FloatType )
2291- val readback = spark.read
2292- .schema(schema)
2293- .csv(path.getAbsolutePath)
2294- .filter(" floats is null" )
2295- checkAnswer(readback, Seq (Row (null , 4.0 ), Row (null , 6.0 )))
2274+ test(" SPARK-30530: apply filters to malformed rows" ) {
2275+ withSQLConf(SQLConf .CSV_FILTER_PUSHDOWN_ENABLED .key -> " true" ) {
2276+ withTempPath { path =>
2277+ Seq (
2278+ " 100.0,1.0," ,
2279+ " 200.0,," ,
2280+ " 300.0,3.0," ,
2281+ " 1.0,4.0," ,
2282+ " ,4.0," ,
2283+ " 500.0,," ,
2284+ " ,6.0," ,
2285+ " -500.0,50.5" ).toDF(" data" )
2286+ .repartition(1 )
2287+ .write.text(path.getAbsolutePath)
2288+ val schema = new StructType ().add(" floats" , FloatType ).add(" more_floats" , FloatType )
2289+ val readback = spark.read
2290+ .schema(schema)
2291+ .csv(path.getAbsolutePath)
2292+ .filter(" floats is null" )
2293+ checkAnswer(readback, Seq (Row (null , 4.0 ), Row (null , 6.0 )))
2294+ }
22962295 }
22972296 }
22982297}
0 commit comments