From d712319be2f6fd212e9bbbae51f8dca747b50ae4 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 00:50:05 -0800 Subject: [PATCH 1/6] [SPARK-12446][SQL] Add unit tests for JDBCRDD internal functions No tests done for JDBCRDD#compileFilter. Author: Takeshi YAMAMURO Closes #10409 from maropu/AddTestsInJdbcRdd. (cherry picked from commit 8c1b867cee816d0943184c7b485cd11e255d8130) Signed-off-by: Dongjoon Hyun --- .../execution/datasources/jdbc/JDBCRDD.scala | 60 ++++++++++--------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 +++++++- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index fad482b42735..4c9a63305db8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -165,8 +165,37 @@ private[sql] object JDBCRDD extends Logging { * @return A Catalyst schema corresponding to columns in the given order. */ private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { - val fieldMap = Map(schema.fields map { x => x.metadata.getString("name") -> x }: _*) - new StructType(columns map { name => fieldMap(name) }) + val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) + new StructType(columns.map(name => fieldMap(name))) + } + + /** + * Converts value to SQL expression. + */ + private def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case timestampValue: Timestamp => "'" + timestampValue + "'" + case dateValue: Date => "'" + dateValue + "'" + case _ => value + } + + private def escapeSql(value: String): String = + if (value == null) null else StringUtils.replace(value, "'", "''") + + /** + * Turns a single Filter into a String representing a SQL expression. + * Returns null for an unhandled filter. + */ + private def compileFilter(f: Filter): String = f match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case _ => null } @@ -240,37 +269,12 @@ private[sql] class JDBCRDD( if (sb.length == 0) "1" else sb.substring(1) } - /** - * Converts value to SQL expression. - */ - private def compileValue(value: Any): Any = value match { - case stringValue: String => s"'${escapeSql(stringValue)}'" - case timestampValue: Timestamp => "'" + timestampValue + "'" - case dateValue: Date => "'" + dateValue + "'" - case _ => value - } - - private def escapeSql(value: String): String = - if (value == null) null else StringUtils.replace(value, "'", "''") - - /** - * Turns a single Filter into a String representing a SQL expression. - * Returns null for an unhandled filter. - */ - private def compileFilter(f: Filter): String = f match { - case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case LessThan(attr, value) => s"$attr < ${compileValue(value)}" - case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" - case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" - case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" - case _ => null - } /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = { - val filterStrings = filters map compileFilter filter (_ != null) + val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null) if (filterStrings.size > 0) { val sb = new StringBuilder("WHERE ") filterStrings.foreach(x => sb.append(x).append(" AND ")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 4b389add63c4..e28dc7af1141 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -18,18 +18,22 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.DriverManager +import java.sql.{Date, DriverManager, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils -class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { +class JDBCSuite extends SparkFunSuite + with BeforeAndAfter with PrivateMethodTester with SharedSQLContext { import testImplicits._ val url = "jdbc:h2:mem:testdb0" @@ -427,6 +431,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } + test("compile filters") { + val compileFilter = PrivateMethod[String]('compileFilter) + def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) + assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") + assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") + assert(doCompileFilter(LessThan("col3", + Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") + assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'") + assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") + assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") + assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") + assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") + } + test("Dialect unregister") { JdbcDialects.registerDialect(testH2Dialect) JdbcDialects.unregisterDialect(testH2Dialect) From abaae83cd26e419828067818f36d0cfd7862b3fd Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 30 Dec 2015 13:34:37 -0800 Subject: [PATCH 2/6] [SPARK-12409][SPARK-12387][SPARK-12391][SQL] Support AND/OR/IN/LIKE push-down filters for JDBC This is rework from #10386 and add more tests and LIKE push-down support. Author: Takeshi YAMAMURO Closes #10468 from maropu/SupportMorePushdownInJdbc. (cherry picked from commit 5c2682b0c8fd2aeae2af1adb716ee0d5f8b85135) Signed-off-by: Dongjoon Hyun --- .../execution/datasources/jdbc/JDBCRDD.scala | 9 +++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 42 +++++++++++++++---- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 4c9a63305db8..d629c486f96b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -176,6 +176,7 @@ private[sql] object JDBCRDD extends Logging { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" + case arrayValue: Array[Object] => arrayValue.map(compileValue).mkString(", ") case _ => value } @@ -188,13 +189,19 @@ private[sql] object JDBCRDD extends Logging { */ private def compileFilter(f: Filter): String = f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" + case Not(f) => s"(NOT (${compileFilter(f)}))" case LessThan(attr, value) => s"$attr < ${compileValue(value)}" case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" + case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" + case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" case IsNull(attr) => s"$attr IS NULL" case IsNotNull(attr) => s"$attr IS NOT NULL" + case In(attr, value) => s"$attr IN (${compileValue(value)})" + case Or(f1, f2) => s"(${compileFilter(f1)}) OR (${compileFilter(f2)})" + case And(f1, f2) => s"(${compileFilter(f1)}) AND (${compileFilter(f2)})" case _ => null } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index e28dc7af1141..227791f75b3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -180,12 +181,32 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - assert(sql("SELECT * FROM foobar WHERE THEID < 1").collect().size === 0) - assert(sql("SELECT * FROM foobar WHERE THEID != 2").collect().size === 2) - assert(sql("SELECT * FROM foobar WHERE THEID = 1").collect().size === 1) - assert(sql("SELECT * FROM foobar WHERE NAME = 'fred'").collect().size === 1) - assert(sql("SELECT * FROM foobar WHERE NAME > 'fred'").collect().size === 2) - assert(sql("SELECT * FROM foobar WHERE NAME != 'fred'").collect().size === 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + .collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + .collect().size === 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + .collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + + "AND THEID = 2")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + + // This is a test to reflect discussion in SPARK-12218. + // The older versions of spark have this kind of bugs in parquet data source. + val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") + val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") + assert(df1.collect.toSet === Set(Row("mary", 2))) + assert(df2.collect.toSet === Set(Row("mary", 2))) } test("SELECT * WHERE (quoted strings)") { @@ -435,7 +456,11 @@ class JDBCSuite extends SparkFunSuite val compileFilter = PrivateMethod[String]('compileFilter) def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") - assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") + assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) + === "(col0 = 0) AND (col1 = 'def')") + assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) + === "(col0 = 2) OR (col1 = 'ghi')") assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") assert(doCompileFilter(LessThan("col3", Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") @@ -443,6 +468,9 @@ class JDBCSuite extends SparkFunSuite assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')") + assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) + === "(NOT (col1 IN ('mno', 'pqr')))") assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") } From bb9aa2cf779bcce4555f4c6bcb7f924b5bb26989 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 Jan 2016 00:54:25 -0800 Subject: [PATCH 3/6] [SPARK-12409][SPARK-12387][SPARK-12391][SQL] Refactor filter pushdown for JDBCRDD and add few filters This patch refactors the filter pushdown for JDBCRDD and also adds few filters. Added filters are basically from #10468 with some refactoring. Test cases are from #10468. Author: Liang-Chi Hsieh Closes #10470 from viirya/refactor-jdbc-filter. (cherry picked from commit ad5b7cfcca7a5feb83b9ed94b6e725c6d789579b) Signed-off-by: Dongjoon Hyun --- .../execution/datasources/jdbc/JDBCRDD.scala | 69 +++++++++++-------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +- 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index d629c486f96b..1d561731d1cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -176,7 +176,7 @@ private[sql] object JDBCRDD extends Logging { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" - case arrayValue: Array[Object] => arrayValue.map(compileValue).mkString(", ") + case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } @@ -185,24 +185,41 @@ private[sql] object JDBCRDD extends Logging { /** * Turns a single Filter into a String representing a SQL expression. - * Returns null for an unhandled filter. + * Returns None for an unhandled filter. */ - private def compileFilter(f: Filter): String = f match { - case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case Not(f) => s"(NOT (${compileFilter(f)}))" - case LessThan(attr, value) => s"$attr < ${compileValue(value)}" - case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" - case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" - case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" - case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" - case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" - case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" - case IsNull(attr) => s"$attr IS NULL" - case IsNotNull(attr) => s"$attr IS NOT NULL" - case In(attr, value) => s"$attr IN (${compileValue(value)})" - case Or(f1, f2) => s"(${compileFilter(f1)}) OR (${compileFilter(f2)})" - case And(f1, f2) => s"(${compileFilter(f1)}) AND (${compileFilter(f2)})" - case _ => null + private def compileFilter(f: Filter): Option[String] = { + Option(f match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" + case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" + case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" + case In(attr, value) => s"$attr IN (${compileValue(value)})" + case Not(f) => compileFilter(f).map(p => s"(NOT ($p))").getOrElse(null) + case Or(f1, f2) => + // We can't compile Or filter unless both sub-filters are compiled successfully. + // It applies too for the following And filter. + // If we can make sure compileFilter supports all filters, we can remove this check. + val or = Seq(f1, f2).map(compileFilter(_)).flatten + if (or.size == 2) { + or.map(p => s"($p)").mkString(" OR ") + } else { + null + } + case And(f1, f2) => + val and = Seq(f1, f2).map(compileFilter(_)).flatten + if (and.size == 2) { + and.map(p => s"($p)").mkString(" AND ") + } else { + null + } + case _ => null + }) } @@ -280,25 +297,21 @@ private[sql] class JDBCRDD( /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ - private val filterWhereClause: String = { - val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null) - if (filterStrings.size > 0) { - val sb = new StringBuilder("WHERE ") - filterStrings.foreach(x => sb.append(x).append(" AND ")) - sb.substring(0, sb.length - 5) - } else "" - } + private val filterWhereClause: String = + filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ") /** * A WHERE clause representing both `filters`, if any, and the current partition. */ private def getWhereClause(part: JDBCPartition): String = { if (part.whereClause != null && filterWhereClause.length > 0) { - filterWhereClause + " AND " + part.whereClause + "WHERE " + filterWhereClause + " AND " + part.whereClause } else if (part.whereClause != null) { "WHERE " + part.whereClause + } else if (filterWhereClause.length > 0) { + "WHERE " + filterWhereClause } else { - filterWhereClause + "" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 227791f75b3d..938bd7494325 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -190,7 +190,7 @@ class JDBCSuite extends SparkFunSuite assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) - .collect().size === 2) + .collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " @@ -453,8 +453,8 @@ class JDBCSuite extends SparkFunSuite } test("compile filters") { - val compileFilter = PrivateMethod[String]('compileFilter) - def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) + val compileFilter = PrivateMethod[Option[String]]('compileFilter) + def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("") assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) @@ -473,6 +473,7 @@ class JDBCSuite extends SparkFunSuite === "(NOT (col1 IN ('mno', 'pqr')))") assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") + assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) === "") } test("Dialect unregister") { From c1b9d83af5fe383f83bfe7da08a8be2f045e34fe Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 2 Jan 2016 00:04:48 -0800 Subject: [PATCH 4/6] [SPARK-10180][SQL] JDBC datasource are not processing EqualNullSafe filter This PR is followed by https://github.com/apache/spark/pull/8391. Previous PR fixes JDBCRDD to support null-safe equality comparison for JDBC datasource. This PR fixes the problem that it can actually return null as a result of the comparison resulting error as using the value of that comparison. Author: hyukjinkwon Author: HyukjinKwon Closes #8743 from HyukjinKwon/SPARK-10180. (cherry picked from commit 94f7a12b3c8e4a6ecd969893e562feb7ffba4c24) Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 1d561731d1cd..9b847cab0995 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -190,6 +190,9 @@ private[sql] object JDBCRDD extends Logging { private def compileFilter(f: Filter): Option[String] = { Option(f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case EqualNullSafe(attr, value) => + s"(NOT ($attr != ${compileValue(value)} OR $attr IS NULL OR " + + s"${compileValue(value)} IS NULL) OR ($attr IS NULL AND ${compileValue(value)} IS NULL))" case LessThan(attr, value) => s"$attr < ${compileValue(value)}" case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" @@ -293,7 +296,6 @@ private[sql] class JDBCRDD( if (sb.length == 0) "1" else sb.substring(1) } - /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 938bd7494325..e8fd693165a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -185,6 +185,7 @@ class JDBCSuite extends SparkFunSuite assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) @@ -473,7 +474,9 @@ class JDBCSuite extends SparkFunSuite === "(NOT (col1 IN ('mno', 'pqr')))") assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") - assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) === "") + assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) + === "((NOT (col0 != 'abc' OR col0 IS NULL OR 'abc' IS NULL) " + + "OR (col0 IS NULL AND 'abc' IS NULL))) AND (col1 = 'def')") } test("Dialect unregister") { From 59b7e4c20c5da81b26039e7e049e3ab656bf1fbe Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 10 Feb 2016 09:45:13 +0800 Subject: [PATCH 5/6] [SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx' Current plan: ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == +- Filter (col0#0 = xxx) +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` This patch enables a plan below; ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` Author: Takeshi YAMAMURO Closes #10427 from maropu/RemoveFilterInJdbcScan. (cherry picked from commit 6f710f9fd4f85370557b7705020ff16f2385e645) Signed-off-by: Dongjoon Hyun --- .../execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../datasources/jdbc/JDBCRelation.scala | 5 ++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 58 +++++++++++++------ 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 9b847cab0995..3c55523645c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -187,7 +187,7 @@ private[sql] object JDBCRDD extends Logging { * Turns a single Filter into a String representing a SQL expression. * Returns None for an unhandled filter. */ - private def compileFilter(f: Filter): Option[String] = { + private[jdbc] def compileFilter(f: Filter): Option[String] = { Option(f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" case EqualNullSafe(attr, value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 375266f1be42..07c3a107e6bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + // Check if JDBCRDD.compileFilter can accept input filters + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + filters.filter(JDBCRDD.compileFilter(_).isEmpty) + } + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index e8fd693165a9..0d278f2d2b1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -22,11 +22,13 @@ import java.sql.{Date, DriverManager, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException -import org.scalatest.BeforeAndAfter -import org.scalatest.PrivateMethodTester +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -181,26 +183,34 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + def checkPushdown(df: DataFrame): DataFrame = { + val parentPlan = df.queryExecution.executedPlan + // Check if SparkPlan Filter is removed in a physical plan and + // the plan only has PhysicalRDD to scan JDBCRelation. + assert(parentPlan.isInstanceOf[PhysicalRDD]) + assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + df + } + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + "AND THEID = 2")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) // This is a test to reflect discussion in SPARK-12218. // The older versions of spark have this kind of bugs in parquet data source. @@ -210,6 +220,16 @@ class JDBCSuite extends SparkFunSuite assert(df2.collect.toSet === Set(Row("mary", 2))) } + test("SELECT COUNT(1) WHERE (predicates)") { + // Check if an answer is correct when Filter is removed from operations such as count() which + // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in + // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters + // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD + // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more + // discussions. + assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1))) + } + test("SELECT * WHERE (quoted strings)") { assert(sql("select * from foobar").where('NAME === "joe 'foo' \"bar\"").collect().size === 1) } From cf8fd941f3a2ee00740973302874e67712b8785e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 17 Jun 2016 17:11:38 -0700 Subject: [PATCH 6/6] [SPARK-15916][SQL] JDBC filter push down should respect operator precedence This PR fixes the problem that the precedence order is messed when pushing where-clause expression to JDBC layer. **Case 1:** For sql `select * from table where (a or b) and c`, the where-clause is wrongly converted to JDBC where-clause `a or (b and c)` after filter push down. The consequence is that JDBC may returns less or more rows than expected. **Case 2:** For sql `select * from table where always_false_condition`, the result table may not be empty if the JDBC RDD is partitioned using where-clause: ``` spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", "partition 2 where clause"...) ``` Unit test. This PR also close #13640 Author: hyukjinkwon Author: Sean Zhong Closes #13743 from clockfly/SPARK-15916. (cherry picked from commit ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8) Signed-off-by: Cheng Lian (cherry picked from commit b22b20db640e9fac20c5d54cc83964dc74393821) Signed-off-by: Dongjoon Hyun --- .../execution/datasources/jdbc/JDBCRDD.scala | 4 +-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 3c55523645c4..2d77c7292bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -300,14 +300,14 @@ private[sql] class JDBCRDD( * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = - filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ") + filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ") /** * A WHERE clause representing both `filters`, if any, and the current partition. */ private def getWhereClause(part: JDBCPartition): String = { if (part.whereClause != null && filterWhereClause.length > 0) { - "WHERE " + filterWhereClause + " AND " + part.whereClause + "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})" } else if (part.whereClause != null) { "WHERE " + part.whereClause } else if (filterWhereClause.length > 0) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0d278f2d2b1a..ce0efa10c403 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -579,4 +579,30 @@ class JDBCSuite extends SparkFunSuite assert(oracleDialect.getJDBCType(StringType). map(_.databaseTypeDefinition).get == "VARCHAR2(255)") } + + private def assertEmptyQuery(sqlString: String): Unit = { + assert(sql(sqlString).collect().isEmpty) + } + + test("SPARK-15916: JDBC filter operator push down should respect operator precedence") { + val TRUE = "NAME != 'non_exists'" + val FALSE1 = "THEID > 1000000000" + val FALSE2 = "THEID < -1000000000" + + assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2") + assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)") + + // Tests JDBCPartition whereClause clause push down. + withTempTable("tempFrame") { + val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE" + val df = sqlContext.read.jdbc( + urlWithUserAndPass, + "TEST.PEOPLE", + predicates = Array[String](jdbcPartitionWhereClause), + new Properties) + + df.registerTempTable("tempFrame") + assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2") + } + } }