From 0bf9ad2ebff81f45eda9887d631cac5ddcc70d6c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 26 Aug 2017 12:24:46 -0700 Subject: [PATCH 1/5] [SPARK-XXX][SQL] Add DataSource suite validating data sources limitations --- .../spark/sql/sources/DataSourceSuite.scala | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala new file mode 100644 index 000000000000..2230ac39d6e9 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.sql.{Date, Timestamp} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{IntWritable, NullWritable, Text} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.orc.{OrcConf, TypeDescription} +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.{OrcInputFormat, OrcOutputFormat} +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgumentFactory} +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSQLContext + +/** + * Data Source qualification as Apache Spark Data Sources. + * - Apache Spark Data Type Value Limits + * - Predicate Push Down + */ +class DataSourceSuite + extends QueryTest + with SharedSQLContext + with BeforeAndAfterAll { + + import testImplicits._ + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set("spark.sql.session.timeZone", "GMT") + } + + override def afterAll(): Unit = { + try { + spark.conf.unset("spark.sql.session.timeZone") + } finally { + super.afterAll() + } + } + + Seq("parquet", "orc").foreach { dataSource => + test(s"$dataSource - data type value limit") { + withTempPath { tempDir => + withTable("tab1") { + val df = (( + false, + true, + Byte.MinValue, + Byte.MaxValue, + Short.MinValue, + Short.MaxValue, + Int.MinValue, + Int.MaxValue, + Long.MinValue, + Long.MaxValue, + Float.MinValue, + Float.MaxValue, + Double.MinValue, + Double.MaxValue, + Date.valueOf("0001-01-01"), + Date.valueOf("9999-12-31"), + new Timestamp(-62135769600000L), // 0001-01-01 00:00:00.000 + new Timestamp(253402300799999L) // 9999-12-31 23:59:59.999 + ) :: Nil).toDF() + df.write.format(dataSource).save(tempDir.getCanonicalPath) + sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${tempDir.toURI}'") + checkAnswer(sql(s"SELECT ${df.schema.fieldNames.mkString(",")} FROM tab1"), df) + } + } + } + } + + // This is a port from TestMapreduceOrcOutputFormat.java of Apache ORC + test("orc - predicate push down") { + withTempDir { dir => + val conf = new JobConf() + val id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0) + val attemptContext = new TaskAttemptContextImpl(conf, id) + val typeStr = "struct" + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr) + conf.set("mapreduce.output.fileoutputformat.outputdir", dir.getCanonicalPath) + conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000) + conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true) + val outputFormat = new OrcOutputFormat[OrcStruct]() + val writer = outputFormat.getRecordWriter(attemptContext) + + // write 4000 rows with the integer and the binary string + val row = OrcStruct.createValue(TypeDescription.fromString(typeStr)).asInstanceOf[OrcStruct] + val nada = NullWritable.get() + + for(r <- 0 until 4000) { + row.setFieldValue(0, new IntWritable(r)) + row.setFieldValue(1, new Text(Integer.toBinaryString(r))) + writer.write(nada, row) + } + writer.close(attemptContext) + + OrcInputFormat.setSearchArgument(conf, + SearchArgumentFactory.newBuilder() + .between("i", PredicateLeaf.Type.LONG, 1500L, 1999L) + .build(), Array[String](null, "i", "s")) + + val split = new FileSplit( + new Path(dir.getCanonicalPath, "part-m-00000.orc"), 0, 1000000, Array[String]()) + val reader = new OrcInputFormat[OrcStruct]().createRecordReader(split, attemptContext) + + // the sarg should cause it to skip over the rows except 1000 to 2000 + for(r <- 1000 until 2000) { + assert(reader.nextKeyValue()) + val row = reader.getCurrentValue + assert(r == row.getFieldValue(0).asInstanceOf[IntWritable].get) + assert(Integer.toBinaryString(r) == row.getFieldValue(1).toString) + } + assert(!reader.nextKeyValue()) + reader.close() + } + } +} From 16031ac5771200ab9ecc47052e1b4c263fa1b8c0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 26 Aug 2017 14:20:11 -0700 Subject: [PATCH 2/5] Use TestHiveSingleton. --- .../org/apache/spark/sql/sources/DataSourceSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala index 2230ac39d6e9..87a349b1ed59 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala @@ -29,10 +29,10 @@ import org.apache.orc.{OrcConf, TypeDescription} import org.apache.orc.mapred.OrcStruct import org.apache.orc.mapreduce.{OrcInputFormat, OrcOutputFormat} import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgumentFactory} -import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils /** * Data Source qualification as Apache Spark Data Sources. @@ -41,8 +41,8 @@ import org.apache.spark.sql.test.SharedSQLContext */ class DataSourceSuite extends QueryTest - with SharedSQLContext - with BeforeAndAfterAll { + with SQLTestUtils + with TestHiveSingleton { import testImplicits._ From 104f24c9ad0743dc7c6329b4c0dde902e8e87de6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 31 Aug 2017 14:49:59 -0700 Subject: [PATCH 3/5] Add JSON/CSV format and split user-defined schema and inferSchema. --- .../spark/sql/sources/DataSourceSuite.scala | 66 +++++++++++-------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala index 87a349b1ed59..afbb44526bc4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala @@ -30,7 +30,7 @@ import org.apache.orc.mapred.OrcStruct import org.apache.orc.mapreduce.{OrcInputFormat, OrcOutputFormat} import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgumentFactory} -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{Dataset, QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -46,9 +46,32 @@ class DataSourceSuite import testImplicits._ + var df: Dataset[Row] = _ + override def beforeAll(): Unit = { super.beforeAll() spark.conf.set("spark.sql.session.timeZone", "GMT") + + df = (( + false, + true, + Byte.MinValue, + Byte.MaxValue, + Short.MinValue, + Short.MaxValue, + Int.MinValue, + Int.MaxValue, + Long.MinValue, + Long.MaxValue, + Float.MinValue, + Float.MaxValue, + Double.MinValue, + Double.MaxValue, + Date.valueOf("0001-01-01"), + Date.valueOf("9999-12-31"), + new Timestamp(-62135769600000L), // 0001-01-01 00:00:00.000 + new Timestamp(253402300799999L) // 9999-12-31 23:59:59.999 + ) :: Nil).toDF() } override def afterAll(): Unit = { @@ -59,33 +82,22 @@ class DataSourceSuite } } - Seq("parquet", "orc").foreach { dataSource => + Seq("parquet", "orc", "json", "csv").foreach { dataSource => test(s"$dataSource - data type value limit") { withTempPath { tempDir => - withTable("tab1") { - val df = (( - false, - true, - Byte.MinValue, - Byte.MaxValue, - Short.MinValue, - Short.MaxValue, - Int.MinValue, - Int.MaxValue, - Long.MinValue, - Long.MaxValue, - Float.MinValue, - Float.MaxValue, - Double.MinValue, - Double.MaxValue, - Date.valueOf("0001-01-01"), - Date.valueOf("9999-12-31"), - new Timestamp(-62135769600000L), // 0001-01-01 00:00:00.000 - new Timestamp(253402300799999L) // 9999-12-31 23:59:59.999 - ) :: Nil).toDF() - df.write.format(dataSource).save(tempDir.getCanonicalPath) - sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${tempDir.toURI}'") - checkAnswer(sql(s"SELECT ${df.schema.fieldNames.mkString(",")} FROM tab1"), df) + df.write.format(dataSource).save(tempDir.getCanonicalPath) + + // Use the same schema for saving/loading + checkAnswer( + spark.read.format(dataSource).schema(df.schema).load(tempDir.getCanonicalPath), + df) + + // Use schema inference, but skip text-based format due to its limitation + if (Seq("parquet", "orc").contains(dataSource)) { + withTable("tab1") { + sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${tempDir.toURI}'") + checkAnswer(sql(s"SELECT ${df.schema.fieldNames.mkString(",")} FROM tab1"), df) + } } } } @@ -100,7 +112,7 @@ class DataSourceSuite val typeStr = "struct" OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr) conf.set("mapreduce.output.fileoutputformat.outputdir", dir.getCanonicalPath) - conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000) + conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000) conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true) val outputFormat = new OrcOutputFormat[OrcStruct]() val writer = outputFormat.getRecordWriter(attemptContext) From 44663258c529f0c426b777736309e5a82cf72d92 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 1 Sep 2017 09:18:02 -0700 Subject: [PATCH 4/5] Update ORC PPD test case more general. --- .../spark/sql/sources/DataSourceSuite.scala | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala index afbb44526bc4..8c24a86bb20a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala @@ -20,14 +20,14 @@ package org.apache.spark.sql.sources import java.sql.{Date, Timestamp} import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{IntWritable, NullWritable, Text} +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.orc.{OrcConf, TypeDescription} +import org.apache.orc.OrcConf import org.apache.orc.mapred.OrcStruct -import org.apache.orc.mapreduce.{OrcInputFormat, OrcOutputFormat} +import org.apache.orc.mapreduce.OrcInputFormat import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgumentFactory} import org.apache.spark.sql.{Dataset, QueryTest, Row} @@ -103,46 +103,40 @@ class DataSourceSuite } } - // This is a port from TestMapreduceOrcOutputFormat.java of Apache ORC test("orc - predicate push down") { withTempDir { dir => + dir.delete() + + // write 4000 rows with the integer and the string in a single orc file + spark + .range(4000) + .map(i => (i, s"$i")) + .toDF("i", "s") + .repartition(1) + .write + .option(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000) + .orc(dir.getCanonicalPath) + val fileName = dir.list().find(_.endsWith(".orc")) + assert(fileName.isDefined) + + // Predicate Push-down: BETWEEN 1500 AND 1999 val conf = new JobConf() val id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0) val attemptContext = new TaskAttemptContextImpl(conf, id) - val typeStr = "struct" - OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr) - conf.set("mapreduce.output.fileoutputformat.outputdir", dir.getCanonicalPath) - conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000) - conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true) - val outputFormat = new OrcOutputFormat[OrcStruct]() - val writer = outputFormat.getRecordWriter(attemptContext) - - // write 4000 rows with the integer and the binary string - val row = OrcStruct.createValue(TypeDescription.fromString(typeStr)).asInstanceOf[OrcStruct] - val nada = NullWritable.get() - - for(r <- 0 until 4000) { - row.setFieldValue(0, new IntWritable(r)) - row.setFieldValue(1, new Text(Integer.toBinaryString(r))) - writer.write(nada, row) - } - writer.close(attemptContext) - OrcInputFormat.setSearchArgument(conf, SearchArgumentFactory.newBuilder() .between("i", PredicateLeaf.Type.LONG, 1500L, 1999L) .build(), Array[String](null, "i", "s")) - - val split = new FileSplit( - new Path(dir.getCanonicalPath, "part-m-00000.orc"), 0, 1000000, Array[String]()) + val path = new Path(dir.getCanonicalPath, fileName.get) + val split = new FileSplit(path, 0, Int.MaxValue, Array[String]()) val reader = new OrcInputFormat[OrcStruct]().createRecordReader(split, attemptContext) // the sarg should cause it to skip over the rows except 1000 to 2000 for(r <- 1000 until 2000) { assert(reader.nextKeyValue()) val row = reader.getCurrentValue - assert(r == row.getFieldValue(0).asInstanceOf[IntWritable].get) - assert(Integer.toBinaryString(r) == row.getFieldValue(1).toString) + assert(r == row.getFieldValue(0).asInstanceOf[LongWritable].get) + assert(r.toString == row.getFieldValue(1).toString) } assert(!reader.nextKeyValue()) reader.close() From fb72c89478149da99bd7ac402257ab7468156f9d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 3 Sep 2017 12:21:32 -0700 Subject: [PATCH 5/5] Address comments. --- .../spark/sql/sources/DataSourceSuite.scala | 82 +++++++------------ 1 file changed, 31 insertions(+), 51 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala index 8c24a86bb20a..7a9146920b57 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala @@ -19,25 +19,17 @@ package org.apache.spark.sql.sources import java.sql.{Date, Timestamp} -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileSplit -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.orc.OrcConf -import org.apache.orc.mapred.OrcStruct -import org.apache.orc.mapreduce.OrcInputFormat -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgumentFactory} import org.apache.spark.sql.{Dataset, QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils /** * Data Source qualification as Apache Spark Data Sources. - * - Apache Spark Data Type Value Limits - * - Predicate Push Down + * - Apache Spark Data Type Value Limits: CSV, JSON, ORC, Parquet + * - Predicate Push Down: ORC */ class DataSourceSuite extends QueryTest @@ -84,18 +76,18 @@ class DataSourceSuite Seq("parquet", "orc", "json", "csv").foreach { dataSource => test(s"$dataSource - data type value limit") { - withTempPath { tempDir => - df.write.format(dataSource).save(tempDir.getCanonicalPath) + withTempPath { dir => + df.write.format(dataSource).save(dir.getCanonicalPath) // Use the same schema for saving/loading checkAnswer( - spark.read.format(dataSource).schema(df.schema).load(tempDir.getCanonicalPath), + spark.read.format(dataSource).schema(df.schema).load(dir.getCanonicalPath), df) // Use schema inference, but skip text-based format due to its limitation if (Seq("parquet", "orc").contains(dataSource)) { withTable("tab1") { - sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${tempDir.toURI}'") + sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${dir.toURI}'") checkAnswer(sql(s"SELECT ${df.schema.fieldNames.mkString(",")} FROM tab1"), df) } } @@ -103,43 +95,31 @@ class DataSourceSuite } } - test("orc - predicate push down") { - withTempDir { dir => - dir.delete() - - // write 4000 rows with the integer and the string in a single orc file - spark - .range(4000) - .map(i => (i, s"$i")) - .toDF("i", "s") - .repartition(1) - .write - .option(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000) - .orc(dir.getCanonicalPath) - val fileName = dir.list().find(_.endsWith(".orc")) - assert(fileName.isDefined) - - // Predicate Push-down: BETWEEN 1500 AND 1999 - val conf = new JobConf() - val id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0) - val attemptContext = new TaskAttemptContextImpl(conf, id) - OrcInputFormat.setSearchArgument(conf, - SearchArgumentFactory.newBuilder() - .between("i", PredicateLeaf.Type.LONG, 1500L, 1999L) - .build(), Array[String](null, "i", "s")) - val path = new Path(dir.getCanonicalPath, fileName.get) - val split = new FileSplit(path, 0, Int.MaxValue, Array[String]()) - val reader = new OrcInputFormat[OrcStruct]().createRecordReader(split, attemptContext) - - // the sarg should cause it to skip over the rows except 1000 to 2000 - for(r <- 1000 until 2000) { - assert(reader.nextKeyValue()) - val row = reader.getCurrentValue - assert(r == row.getFieldValue(0).asInstanceOf[LongWritable].get) - assert(r.toString == row.getFieldValue(1).toString) + Seq("orc").foreach { dataSource => + test(s"$dataSource - predicate push down") { + withSQLConf( + SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + // write 4000 rows with the integer and the string in a single orc file with stride 1000 + spark + .range(4000) + .map(i => (i, s"$i")) + .toDF("i", "s") + .repartition(1) + .write + .option(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000) + // TODO: Add Parquet option, too. + .format(dataSource) + .save(dir.getCanonicalPath) + + val df = spark.read.format(dataSource).load(dir.getCanonicalPath) + .where(s"i BETWEEN 1500 AND 1999") + // skip over the rows except 1000 to 2000 + val answer = spark.range(1000, 2000).map(i => (i, s"$i")).toDF("i", "s") + checkAnswer(stripSparkFilter(df), answer) + } } - assert(!reader.nextKeyValue()) - reader.close() } } }