Skip to content

Commit 59b7e4c

Browse files
maropudongjoon-hyun
authored andcommitted
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter
Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx' Current plan: ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == +- Filter (col0#0 = xxx) +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` This patch enables a plan below; ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` Author: Takeshi YAMAMURO <[email protected]> Closes #10427 from maropu/RemoveFilterInJdbcScan. (cherry picked from commit 6f710f9) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c1b9d83 commit 59b7e4c

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private[sql] object JDBCRDD extends Logging {
187187
* Turns a single Filter into a String representing a SQL expression.
188188
* Returns None for an unhandled filter.
189189
*/
190-
private def compileFilter(f: Filter): Option[String] = {
190+
private[jdbc] def compileFilter(f: Filter): Option[String] = {
191191
Option(f match {
192192
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
193193
case EqualNullSafe(attr, value) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(
9090

9191
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
9292

93+
// Check if JDBCRDD.compileFilter can accept input filters
94+
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
95+
filters.filter(JDBCRDD.compileFilter(_).isEmpty)
96+
}
97+
9398
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
9499
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
95100
JDBCRDD.scanTable(

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import java.sql.{Date, DriverManager, Timestamp}
2222
import java.util.{Calendar, GregorianCalendar, Properties}
2323

2424
import org.h2.jdbc.JdbcSQLException
25-
import org.scalatest.BeforeAndAfter
26-
import org.scalatest.PrivateMethodTester
25+
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2726

27+
import org.apache.spark.sql.execution.ExplainCommand
2828
import org.apache.spark.SparkFunSuite
29-
import org.apache.spark.sql.Row
29+
import org.apache.spark.sql.{DataFrame, Row}
30+
import org.apache.spark.sql.execution.PhysicalRDD
31+
import org.apache.spark.sql.execution.datasources.LogicalRelation
3032
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
3133
import org.apache.spark.sql.test.SharedSQLContext
3234
import org.apache.spark.sql.types._
@@ -181,26 +183,34 @@ class JDBCSuite extends SparkFunSuite
181183
}
182184

183185
test("SELECT * WHERE (simple predicates)") {
184-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
185-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
186-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
187-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
188-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
189-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
190-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
191-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
186+
def checkPushdown(df: DataFrame): DataFrame = {
187+
val parentPlan = df.queryExecution.executedPlan
188+
// Check if SparkPlan Filter is removed in a physical plan and
189+
// the plan only has PhysicalRDD to scan JDBCRelation.
190+
assert(parentPlan.isInstanceOf[PhysicalRDD])
191+
assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
192+
df
193+
}
194+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
195+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
196+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
197+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
198+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
199+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
200+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
201+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
192202
.collect().size == 2)
193-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
203+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
194204
.collect().size == 2)
195-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
205+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
196206
.collect().size == 2)
197-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
207+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
198208
+ "AND THEID = 2")).collect().size == 2)
199-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
200-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
201-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
202-
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
203-
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
209+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
210+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
211+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
212+
assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
213+
assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
204214

205215
// This is a test to reflect discussion in SPARK-12218.
206216
// The older versions of spark have this kind of bugs in parquet data source.
@@ -210,6 +220,16 @@ class JDBCSuite extends SparkFunSuite
210220
assert(df2.collect.toSet === Set(Row("mary", 2)))
211221
}
212222

223+
test("SELECT COUNT(1) WHERE (predicates)") {
224+
// Check if an answer is correct when Filter is removed from operations such as count() which
225+
// does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in
226+
// org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters
227+
// are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD
228+
// correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more
229+
// discussions.
230+
assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1)))
231+
}
232+
213233
test("SELECT * WHERE (quoted strings)") {
214234
assert(sql("select * from foobar").where('NAME === "joe 'foo' \"bar\"").collect().size === 1)
215235
}

0 commit comments

Comments
 (0)