From 7a13c8e6c73e3824b8188b865fcaeda4c7e04117 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 14:18:10 +0900 Subject: [PATCH 1/5] [SPARK-11677][SQL] ORC filter tests all pass if filters are actually not pushed down. --- .../spark/sql/hive/orc/OrcQuerySuite.scala | 58 ++++++++++++++----- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 7efeab528c1d..1f670dadf3dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -57,6 +57,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { tempFile } + def extractSourceRDDToDataFrame(df: DataFrame): DataFrame = { + + // This is the source RDD without Spark-side filtering. + val schema = df.schema + val childRDD = df + .queryExecution + .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter] + .child + .execute() + .map(row => Row.fromSeq(row.toSeq(schema))) + + sqlContext.createDataFrame(childRDD, schema) + } + test("Read/write All Types") { val data = (0 to 255).map { i => (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0) @@ -352,26 +366,38 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { import testImplicits._ val path = dir.getCanonicalPath - sqlContext.range(10).coalesce(1).write.orc(path) + val data = (0 to 10).map { i => + val maybeInt = if (i == 10) None else Some(i) + Tuple1(maybeInt) + } + createDataFrame(data).toDF("id").write.orc(path) val df = sqlContext.read.orc(path) - def checkPredicate(pred: Column, answer: Seq[Long]): Unit = { - checkAnswer(df.where(pred), answer.map(Row(_))) + def checkPredicate(pred: Column, answer: Seq[Any]): Unit = { + val sourceDf = extractSourceRDDToDataFrame(df.where(pred)) + val expectedData = answer.map(Row(_)).toSet + val data = sourceDf.collect().toSet + + // The result should be single row. When a filter is pushed to ORC, ORC can apply it to + // every row. So, we can check the number of rows returned from the ORC to make sure + // our filter pushdown work. A tricky part is, ORC does not process filter fully but + // return some possible results. So, the number is checked if it is less than + // the original count, and then checks if it contains the expected value. + val isOrcFiltered = sourceDf.count < 10 && expectedData.subsetOf(data) + assert(isOrcFiltered) } - checkPredicate('id === 5, Seq(5L)) - checkPredicate('id <=> 5, Seq(5L)) - checkPredicate('id < 5, 0L to 4L) - checkPredicate('id <= 5, 0L to 5L) - checkPredicate('id > 5, 6L to 9L) - checkPredicate('id >= 5, 5L to 9L) - checkPredicate('id.isNull, Seq.empty[Long]) - checkPredicate('id.isNotNull, 0L to 9L) - checkPredicate('id.isin(1L, 3L, 5L), Seq(1L, 3L, 5L)) - checkPredicate('id > 0 && 'id < 3, 1L to 2L) - checkPredicate('id < 1 || 'id > 8, Seq(0L, 9L)) - checkPredicate(!('id > 3), 0L to 3L) - checkPredicate(!('id > 0 && 'id < 3), Seq(0L) ++ (3L to 9L)) + checkPredicate('id === 5, Seq(5)) + checkPredicate('id <=> 5, Seq(5)) + checkPredicate('id < 5, 0 to 4) + checkPredicate('id <= 5, 0 to 5) + checkPredicate('id > 5, 6 to 9) + checkPredicate('id >= 5, 5 to 9) + checkPredicate('id.isNull, Seq(null)) + checkPredicate('id > 0 && 'id < 3, 0 to 2) + checkPredicate('id < 1 || 'id > 8, Seq(0, 9)) + checkPredicate(!('id > 3), 0 to 3) + checkPredicate(!('id > 0 && 'id < 3), Seq(0) ++ (3 to 9)) } } } From 82d0aa773d58115b0a2b3d5fd782d473e26c2671 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 16:43:00 +0900 Subject: [PATCH 2/5] [SPARK-11677][SQL] Add tests for is-not-null operator and in-operator --- .../spark/sql/hive/orc/OrcQuerySuite.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 1f670dadf3dd..532bab7cc4a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -364,19 +364,25 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { import testImplicits._ - val path = dir.getCanonicalPath - val data = (0 to 10).map { i => - val maybeInt = if (i == 10) None else Some(i) - Tuple1(maybeInt) + + // For field "a", the first column has odds integers. This is to check the filtered count + // when `isNull` is performed. + // For Field "b", `isNotNull` of ORC file filters rows only when all the values are + // null (maybe this works differently when the data or query is complicated). + // So, simply here a column only having `null` is added. + val data = (0 until 10).map { i => + val maybeInt = if (i % 2 == 0) None else Some(i) + val nullValue: Option[String] = None + (maybeInt, nullValue) } - createDataFrame(data).toDF("id").write.orc(path) + createDataFrame(data).toDF("a", "b").write.orc(path) val df = sqlContext.read.orc(path) def checkPredicate(pred: Column, answer: Seq[Any]): Unit = { val sourceDf = extractSourceRDDToDataFrame(df.where(pred)) - val expectedData = answer.map(Row(_)).toSet val data = sourceDf.collect().toSet + val expectedData = answer.map(Row(_, null)).toSet // The result should be single row. When a filter is pushed to ORC, ORC can apply it to // every row. So, we can check the number of rows returned from the ORC to make sure @@ -387,17 +393,19 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { assert(isOrcFiltered) } - checkPredicate('id === 5, Seq(5)) - checkPredicate('id <=> 5, Seq(5)) - checkPredicate('id < 5, 0 to 4) - checkPredicate('id <= 5, 0 to 5) - checkPredicate('id > 5, 6 to 9) - checkPredicate('id >= 5, 5 to 9) - checkPredicate('id.isNull, Seq(null)) - checkPredicate('id > 0 && 'id < 3, 0 to 2) - checkPredicate('id < 1 || 'id > 8, Seq(0, 9)) - checkPredicate(!('id > 3), 0 to 3) - checkPredicate(!('id > 0 && 'id < 3), Seq(0) ++ (3 to 9)) + checkPredicate('a === 5, Seq(5)) + checkPredicate('a <=> 5, Seq(5)) + checkPredicate('a < 5, Seq(1, 3)) + checkPredicate('a <= 5, Seq(1, 3, 5)) + checkPredicate('a > 5, Seq(7, 9)) + checkPredicate('a >= 5, Seq(5, 7, 9)) + checkPredicate('a.isNull, Seq(null)) + checkPredicate('b.isNotNull, Seq()) + checkPredicate('a.isin(3, 5, 7), Seq(3, 5, 7)) + checkPredicate('a > 0 && 'a < 3, Seq(1)) + checkPredicate('a < 1 || 'a > 8, Seq(9)) + checkPredicate(!('a > 3), Seq(1, 3)) + checkPredicate(!('a > 0 && 'a < 3), Seq(3, 5, 7, 9)) } } } From cd7bd12337539be93198cc7c1610b7779dbef558 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 16 Nov 2015 10:00:56 +0900 Subject: [PATCH 3/5] [SPARK-11677][SQL] Update format of expected data and update some comments --- .../spark/sql/hive/orc/OrcQuerySuite.scala | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 532bab7cc4a0..c3d27e30d080 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -367,10 +367,9 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val path = dir.getCanonicalPath // For field "a", the first column has odds integers. This is to check the filtered count - // when `isNull` is performed. - // For Field "b", `isNotNull` of ORC file filters rows only when all the values are - // null (maybe this works differently when the data or query is complicated). - // So, simply here a column only having `null` is added. + // when `isNull` is performed. For Field "b", `isNotNull` of ORC file filters rows + // only when all the values are null (maybe this works differently when the data + // or query is complicated). So, simply here a column only having `null` is added. val data = (0 until 10).map { i => val maybeInt = if (i % 2 == 0) None else Some(i) val nullValue: Option[String] = None @@ -379,33 +378,33 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { createDataFrame(data).toDF("a", "b").write.orc(path) val df = sqlContext.read.orc(path) - def checkPredicate(pred: Column, answer: Seq[Any]): Unit = { + def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { val sourceDf = extractSourceRDDToDataFrame(df.where(pred)) val data = sourceDf.collect().toSet - val expectedData = answer.map(Row(_, null)).toSet + val expectedData = answer.toSet - // The result should be single row. When a filter is pushed to ORC, ORC can apply it to - // every row. So, we can check the number of rows returned from the ORC to make sure - // our filter pushdown work. A tricky part is, ORC does not process filter fully but - // return some possible results. So, the number is checked if it is less than - // the original count, and then checks if it contains the expected value. + // When a filter is pushed to ORC, ORC can apply it to rows. So, we can check + // the number of rows returned from the ORC to make sure our filter pushdown work. + // A tricky part is, ORC does not process filter rows fully but return some possible + // results. So, this checks if the number of result is less than the original count + // of data, and then checks if it contains the expected data. val isOrcFiltered = sourceDf.count < 10 && expectedData.subsetOf(data) assert(isOrcFiltered) } - checkPredicate('a === 5, Seq(5)) - checkPredicate('a <=> 5, Seq(5)) - checkPredicate('a < 5, Seq(1, 3)) - checkPredicate('a <= 5, Seq(1, 3, 5)) - checkPredicate('a > 5, Seq(7, 9)) - checkPredicate('a >= 5, Seq(5, 7, 9)) - checkPredicate('a.isNull, Seq(null)) - checkPredicate('b.isNotNull, Seq()) - checkPredicate('a.isin(3, 5, 7), Seq(3, 5, 7)) - checkPredicate('a > 0 && 'a < 3, Seq(1)) - checkPredicate('a < 1 || 'a > 8, Seq(9)) - checkPredicate(!('a > 3), Seq(1, 3)) - checkPredicate(!('a > 0 && 'a < 3), Seq(3, 5, 7, 9)) + checkPredicate('a === 5, List(5).map(Row(_, null))) + checkPredicate('a <=> 5, List(5).map(Row(_, null))) + checkPredicate('a < 5, List(1, 3).map(Row(_, null))) + checkPredicate('a <= 5, List(1, 3, 5).map(Row(_, null))) + checkPredicate('a > 5, List(7, 9).map(Row(_, null))) + checkPredicate('a >= 5, List(5, 7, 9).map(Row(_, null))) + checkPredicate('a.isNull, List(null).map(Row(_, null))) + checkPredicate('b.isNotNull, List()) + checkPredicate('a.isin(3, 5, 7), List(3, 5, 7).map(Row(_, null))) + checkPredicate('a > 0 && 'a < 3, List(1).map(Row(_, null))) + checkPredicate('a < 1 || 'a > 8, List(9).map(Row(_, null))) + checkPredicate(!('a > 3), List(1, 3).map(Row(_, null))) + checkPredicate(!('a > 0 && 'a < 3), List(3, 5, 7, 9).map(Row(_, null))) } } } From 18c03260bdb1f406f94e3366e4bd980deb2dccde Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 9 Dec 2015 11:47:56 +0900 Subject: [PATCH 4/5] Rename extractSourceRDDToDataframe to stripSparkFilter. --- .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index c3d27e30d080..a8f1167bcf2e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -57,9 +57,10 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { tempFile } - def extractSourceRDDToDataFrame(df: DataFrame): DataFrame = { - - // This is the source RDD without Spark-side filtering. + /** + * Strip Spark-side filtering in order to check if a datasource filters rows correctly. + */ + protected def stripSparkFilter(df: DataFrame): DataFrame = { val schema = df.schema val childRDD = df .queryExecution @@ -379,7 +380,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val df = sqlContext.read.orc(path) def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { - val sourceDf = extractSourceRDDToDataFrame(df.where(pred)) + val sourceDf = stripSparkFilter(df.where(pred)) val data = sourceDf.collect().toSet val expectedData = answer.toSet From ce9706c1da1b68fabc75c2c6c4dcdca7e63d05a2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Dec 2015 10:57:44 +0900 Subject: [PATCH 5/5] Remove stripSparkFilter function. --- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index a8f1167bcf2e..2156806d21f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -57,21 +57,6 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { tempFile } - /** - * Strip Spark-side filtering in order to check if a datasource filters rows correctly. - */ - protected def stripSparkFilter(df: DataFrame): DataFrame = { - val schema = df.schema - val childRDD = df - .queryExecution - .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter] - .child - .execute() - .map(row => Row.fromSeq(row.toSeq(schema))) - - sqlContext.createDataFrame(childRDD, schema) - } - test("Read/write All Types") { val data = (0 to 255).map { i => (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0)