-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18593][SQL] JDBCRDD returns incorrect results for filters on CHAR of PostgreSQL #16021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
d712319
[SPARK-12446][SQL] Add unit tests for JDBCRDD internal functions
maropu abaae83
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Support AND/OR/IN/LIKE p…
maropu bb9aa2c
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Refactor filter pushdown…
viirya c1b9d83
[SPARK-10180][SQL] JDBC datasource are not processing EqualNullSafe f…
HyukjinKwon 59b7e4c
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removi…
maropu cf8fd94
[SPARK-15916][SQL] JDBC filter push down should respect operator prec…
HyukjinKwon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,18 +18,25 @@ | |
| package org.apache.spark.sql.jdbc | ||
|
|
||
| import java.math.BigDecimal | ||
| import java.sql.DriverManager | ||
| import java.sql.{Date, DriverManager, Timestamp} | ||
| import java.util.{Calendar, GregorianCalendar, Properties} | ||
|
|
||
| import org.h2.jdbc.JdbcSQLException | ||
| import org.scalatest.BeforeAndAfter | ||
| import org.scalatest.{BeforeAndAfter, PrivateMethodTester} | ||
|
|
||
| import org.apache.spark.sql.execution.ExplainCommand | ||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.{DataFrame, Row} | ||
| import org.apache.spark.sql.execution.PhysicalRDD | ||
| import org.apache.spark.sql.execution.datasources.LogicalRelation | ||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.sources._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { | ||
| class JDBCSuite extends SparkFunSuite | ||
| with BeforeAndAfter with PrivateMethodTester with SharedSQLContext { | ||
| import testImplicits._ | ||
|
|
||
| val url = "jdbc:h2:mem:testdb0" | ||
|
|
@@ -176,12 +183,51 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext | |
| } | ||
|
|
||
| test("SELECT * WHERE (simple predicates)") { | ||
| assert(sql("SELECT * FROM foobar WHERE THEID < 1").collect().size === 0) | ||
| assert(sql("SELECT * FROM foobar WHERE THEID != 2").collect().size === 2) | ||
| assert(sql("SELECT * FROM foobar WHERE THEID = 1").collect().size === 1) | ||
| assert(sql("SELECT * FROM foobar WHERE NAME = 'fred'").collect().size === 1) | ||
| assert(sql("SELECT * FROM foobar WHERE NAME > 'fred'").collect().size === 2) | ||
| assert(sql("SELECT * FROM foobar WHERE NAME != 'fred'").collect().size === 2) | ||
| def checkPushdown(df: DataFrame): DataFrame = { | ||
| val parentPlan = df.queryExecution.executedPlan | ||
| // Check if SparkPlan Filter is removed in a physical plan and | ||
| // the plan only has PhysicalRDD to scan JDBCRelation. | ||
| assert(parentPlan.isInstanceOf[PhysicalRDD]) | ||
| assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) | ||
| df | ||
| } | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) | ||
| .collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) | ||
| .collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) | ||
| .collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " | ||
| + "AND THEID = 2")).collect().size == 2) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) | ||
| assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) | ||
|
|
||
| // This is a test to reflect discussion in SPARK-12218. | ||
| // The older versions of spark have this kind of bugs in parquet data source. | ||
| val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") | ||
| val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") | ||
| assert(df1.collect.toSet === Set(Row("mary", 2))) | ||
| assert(df2.collect.toSet === Set(Row("mary", 2))) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, some of original commit having |
||
| } | ||
|
|
||
| test("SELECT COUNT(1) WHERE (predicates)") { | ||
| // Check if an answer is correct when Filter is removed from operations such as count() which | ||
| // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in | ||
| // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters | ||
| // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD | ||
| // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more | ||
| // discussions. | ||
| assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1))) | ||
| } | ||
|
|
||
| test("SELECT * WHERE (quoted strings)") { | ||
|
|
@@ -427,6 +473,32 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext | |
| assert(DerbyColumns === Seq(""""abc"""", """"key"""")) | ||
| } | ||
|
|
||
| test("compile filters") { | ||
| val compileFilter = PrivateMethod[Option[String]]('compileFilter) | ||
| def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("") | ||
| assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") | ||
| assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") | ||
| assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) | ||
| === "(col0 = 0) AND (col1 = 'def')") | ||
| assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) | ||
| === "(col0 = 2) OR (col1 = 'ghi')") | ||
| assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") | ||
| assert(doCompileFilter(LessThan("col3", | ||
| Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") | ||
| assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'") | ||
| assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") | ||
| assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") | ||
| assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") | ||
| assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')") | ||
| assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) | ||
| === "(NOT (col1 IN ('mno', 'pqr')))") | ||
| assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") | ||
| assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") | ||
| assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) | ||
| === "((NOT (col0 != 'abc' OR col0 IS NULL OR 'abc' IS NULL) " | ||
| + "OR (col0 IS NULL AND 'abc' IS NULL))) AND (col1 = 'def')") | ||
| } | ||
|
|
||
| test("Dialect unregister") { | ||
| JdbcDialects.registerDialect(testH2Dialect) | ||
| JdbcDialects.unregisterDialect(testH2Dialect) | ||
|
|
@@ -507,4 +579,30 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext | |
| assert(oracleDialect.getJDBCType(StringType). | ||
| map(_.databaseTypeDefinition).get == "VARCHAR2(255)") | ||
| } | ||
|
|
||
| private def assertEmptyQuery(sqlString: String): Unit = { | ||
| assert(sql(sqlString).collect().isEmpty) | ||
| } | ||
|
|
||
| test("SPARK-15916: JDBC filter operator push down should respect operator precedence") { | ||
| val TRUE = "NAME != 'non_exists'" | ||
| val FALSE1 = "THEID > 1000000000" | ||
| val FALSE2 = "THEID < -1000000000" | ||
|
|
||
| assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2") | ||
| assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)") | ||
|
|
||
| // Tests JDBCPartition whereClause clause push down. | ||
| withTempTable("tempFrame") { | ||
| val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE" | ||
| val df = sqlContext.read.jdbc( | ||
| urlWithUserAndPass, | ||
| "TEST.PEOPLE", | ||
| predicates = Array[String](jdbcPartitionWhereClause), | ||
| new Properties) | ||
|
|
||
| df.registerTempTable("tempFrame") | ||
| assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2") | ||
| } | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related?