@@ -19,25 +19,17 @@ package org.apache.spark.sql.sources
1919
2020import java .sql .{Date , Timestamp }
2121
22- import org .apache .hadoop .fs .Path
23- import org .apache .hadoop .io .LongWritable
24- import org .apache .hadoop .mapred .JobConf
25- import org .apache .hadoop .mapreduce ._
26- import org .apache .hadoop .mapreduce .lib .input .FileSplit
27- import org .apache .hadoop .mapreduce .task .TaskAttemptContextImpl
2822import org .apache .orc .OrcConf
29- import org .apache .orc .mapred .OrcStruct
30- import org .apache .orc .mapreduce .OrcInputFormat
31- import org .apache .orc .storage .ql .io .sarg .{PredicateLeaf , SearchArgumentFactory }
3223
3324import org .apache .spark .sql .{Dataset , QueryTest , Row }
3425import org .apache .spark .sql .hive .test .TestHiveSingleton
26+ import org .apache .spark .sql .internal .SQLConf
3527import org .apache .spark .sql .test .SQLTestUtils
3628
3729/**
3830 * Data Source qualification as Apache Spark Data Sources.
39- * - Apache Spark Data Type Value Limits
40- * - Predicate Push Down
31+ * - Apache Spark Data Type Value Limits: CSV, JSON, ORC, Parquet
32+ * - Predicate Push Down: ORC
4133 */
4234class DataSourceSuite
4335 extends QueryTest
@@ -84,62 +76,50 @@ class DataSourceSuite
8476
8577 Seq (" parquet" , " orc" , " json" , " csv" ).foreach { dataSource =>
8678 test(s " $dataSource - data type value limit " ) {
87- withTempPath { tempDir =>
88- df.write.format(dataSource).save(tempDir .getCanonicalPath)
79+ withTempPath { dir =>
80+ df.write.format(dataSource).save(dir .getCanonicalPath)
8981
9082 // Use the same schema for saving/loading
9183 checkAnswer(
92- spark.read.format(dataSource).schema(df.schema).load(tempDir .getCanonicalPath),
84+ spark.read.format(dataSource).schema(df.schema).load(dir .getCanonicalPath),
9385 df)
9486
9587 // Use schema inference, but skip text-based format due to its limitation
9688 if (Seq (" parquet" , " orc" ).contains(dataSource)) {
9789 withTable(" tab1" ) {
98- sql(s " CREATE TABLE tab1 USING $dataSource LOCATION ' ${tempDir .toURI}' " )
90+ sql(s " CREATE TABLE tab1 USING $dataSource LOCATION ' ${dir .toURI}' " )
9991 checkAnswer(sql(s " SELECT ${df.schema.fieldNames.mkString(" ," )} FROM tab1 " ), df)
10092 }
10193 }
10294 }
10395 }
10496 }
10597
106- test(" orc - predicate push down" ) {
107- withTempDir { dir =>
108- dir.delete()
109-
110- // write 4000 rows with the integer and the string in a single orc file
111- spark
112- .range(4000 )
113- .map(i => (i, s " $i" ))
114- .toDF(" i" , " s" )
115- .repartition(1 )
116- .write
117- .option(OrcConf .ROW_INDEX_STRIDE .getAttribute, 1000 )
118- .orc(dir.getCanonicalPath)
119- val fileName = dir.list().find(_.endsWith(" .orc" ))
120- assert(fileName.isDefined)
121-
122- // Predicate Push-down: BETWEEN 1500 AND 1999
123- val conf = new JobConf ()
124- val id = new TaskAttemptID (" jt" , 0 , TaskType .MAP , 0 , 0 )
125- val attemptContext = new TaskAttemptContextImpl (conf, id)
126- OrcInputFormat .setSearchArgument(conf,
127- SearchArgumentFactory .newBuilder()
128- .between(" i" , PredicateLeaf .Type .LONG , 1500L , 1999L )
129- .build(), Array [String ](null , " i" , " s" ))
130- val path = new Path (dir.getCanonicalPath, fileName.get)
131- val split = new FileSplit (path, 0 , Int .MaxValue , Array [String ]())
132- val reader = new OrcInputFormat [OrcStruct ]().createRecordReader(split, attemptContext)
133-
134- // the sarg should cause it to skip over the rows except 1000 to 2000
135- for (r <- 1000 until 2000 ) {
136- assert(reader.nextKeyValue())
137- val row = reader.getCurrentValue
138- assert(r == row.getFieldValue(0 ).asInstanceOf [LongWritable ].get)
139- assert(r.toString == row.getFieldValue(1 ).toString)
98+ Seq (" orc" ).foreach { dataSource =>
99+ test(s " $dataSource - predicate push down " ) {
100+ withSQLConf(
101+ SQLConf .ORC_FILTER_PUSHDOWN_ENABLED .key -> " true" ,
102+ SQLConf .PARQUET_FILTER_PUSHDOWN_ENABLED .key -> " true" ) {
103+ withTempPath { dir =>
104+ // write 4000 rows with the integer and the string in a single orc file with stride 1000
105+ spark
106+ .range(4000 )
107+ .map(i => (i, s " $i" ))
108+ .toDF(" i" , " s" )
109+ .repartition(1 )
110+ .write
111+ .option(OrcConf .ROW_INDEX_STRIDE .getAttribute, 1000 )
112+ // TODO: Add Parquet option, too.
113+ .format(dataSource)
114+ .save(dir.getCanonicalPath)
115+
116+ val df = spark.read.format(dataSource).load(dir.getCanonicalPath)
117+ .where(s " i BETWEEN 1500 AND 1999 " )
118+ // skip over the rows except 1000 to 2000
119+ val answer = spark.range(1000 , 2000 ).map(i => (i, s " $i" )).toDF(" i" , " s" )
120+ checkAnswer(stripSparkFilter(df), answer)
121+ }
140122 }
141- assert(! reader.nextKeyValue())
142- reader.close()
143123 }
144124 }
145125}
0 commit comments