From bcfef466ec4cc74c4fd6b6180a9008600c6ca07b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 13 Jun 2016 19:32:16 +0900 Subject: [PATCH 1/2] Consider top level and/or precedence for parenthesis --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 8d0906e57425..8eb6857676ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -305,7 +305,7 @@ private[sql] class JDBCRDD( * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = - filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ") + filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ") /** * A WHERE clause representing both `filters`, if any, and the current partition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index abb7918ae607..7df9c34e3f79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -233,6 +233,10 @@ class JDBCSuite extends SparkFunSuite assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + val orPrecedenceSql = + "SELECT * FROM foobar WHERE (NAME = 'fred' OR THEID = 100) AND THEID < 1" + assert(checkPushdown(sql(orPrecedenceSql)).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) From 2f1ada38d6a50916b3f5c9a681e01d8f13c0ac4b Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Fri, 17 Jun 2016 14:10:34 -0700 Subject: [PATCH 2/2] fix partition where clause pushdown --- .../execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 30 ++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 8eb6857676ac..44cfbb9fbd81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -312,7 +312,7 @@ private[sql] class JDBCRDD( */ private def getWhereClause(part: JDBCPartition): String = { if (part.whereClause != null && filterWhereClause.length > 0) { - "WHERE " + filterWhereClause + " AND " + part.whereClause + "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})" } else if (part.whereClause != null) { "WHERE " + part.whereClause } else if (filterWhereClause.length > 0) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7df9c34e3f79..d6ec40c18be2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -233,10 +233,6 @@ class JDBCSuite extends SparkFunSuite assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) - val orPrecedenceSql = - "SELECT * FROM foobar WHERE (NAME = 'fred' OR THEID = 100) AND THEID < 1" - assert(checkPushdown(sql(orPrecedenceSql)).collect().size == 0) - assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) @@ -665,4 +661,30 @@ class JDBCSuite extends SparkFunSuite assert(oracleDialect.getJDBCType(StringType). map(_.databaseTypeDefinition).get == "VARCHAR2(255)") } + + private def assertEmptyQuery(sqlString: String): Unit = { + assert(sql(sqlString).collect().isEmpty) + } + + test("SPARK-15916: JDBC filter operator push down should respect operator precedence") { + val TRUE = "NAME != 'non_exists'" + val FALSE1 = "THEID > 1000000000" + val FALSE2 = "THEID < -1000000000" + + assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2") + assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)") + + // Tests JDBCPartition whereClause clause push down. + withTempTable("tempFrame") { + val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE" + val df = spark.read.jdbc( + urlWithUserAndPass, + "TEST.PEOPLE", + predicates = Array[String](jdbcPartitionWhereClause), + new Properties) + + df.createOrReplaceTempView("tempFrame") + assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2") + } + } }