Skip to content

Commit cf8fd94

Browse files
HyukjinKwondongjoon-hyun
authored andcommitted
[SPARK-15916][SQL] JDBC filter push down should respect operator precedence
This PR fixes the problem that the precedence order is messed when pushing where-clause expression to JDBC layer. **Case 1:** For sql `select * from table where (a or b) and c`, the where-clause is wrongly converted to JDBC where-clause `a or (b and c)` after filter push down. The consequence is that JDBC may returns less or more rows than expected. **Case 2:** For sql `select * from table where always_false_condition`, the result table may not be empty if the JDBC RDD is partitioned using where-clause: ``` spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", "partition 2 where clause"...) ``` Unit test. This PR also close #13640 Author: hyukjinkwon <[email protected]> Author: Sean Zhong <[email protected]> Closes #13743 from clockfly/SPARK-15916. (cherry picked from commit ebb9a3b) Signed-off-by: Cheng Lian <[email protected]> (cherry picked from commit b22b20d) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 59b7e4c commit cf8fd94

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,14 @@ private[sql] class JDBCRDD(
300300
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
301301
*/
302302
private val filterWhereClause: String =
303-
filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ")
303+
filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
304304

305305
/**
306306
* A WHERE clause representing both `filters`, if any, and the current partition.
307307
*/
308308
private def getWhereClause(part: JDBCPartition): String = {
309309
if (part.whereClause != null && filterWhereClause.length > 0) {
310-
"WHERE " + filterWhereClause + " AND " + part.whereClause
310+
"WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})"
311311
} else if (part.whereClause != null) {
312312
"WHERE " + part.whereClause
313313
} else if (filterWhereClause.length > 0) {

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,4 +579,30 @@ class JDBCSuite extends SparkFunSuite
579579
assert(oracleDialect.getJDBCType(StringType).
580580
map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
581581
}
582+
583+
private def assertEmptyQuery(sqlString: String): Unit = {
584+
assert(sql(sqlString).collect().isEmpty)
585+
}
586+
587+
test("SPARK-15916: JDBC filter operator push down should respect operator precedence") {
588+
val TRUE = "NAME != 'non_exists'"
589+
val FALSE1 = "THEID > 1000000000"
590+
val FALSE2 = "THEID < -1000000000"
591+
592+
assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2")
593+
assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)")
594+
595+
// Tests JDBCPartition whereClause clause push down.
596+
withTempTable("tempFrame") {
597+
val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE"
598+
val df = sqlContext.read.jdbc(
599+
urlWithUserAndPass,
600+
"TEST.PEOPLE",
601+
predicates = Array[String](jdbcPartitionWhereClause),
602+
new Properties)
603+
604+
df.registerTempTable("tempFrame")
605+
assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
606+
}
607+
}
582608
}

0 commit comments

Comments
 (0)