Skip to content

Commit d0b1891

Browse files
yjshenliancheng
authored andcommitted
[SPARK-9927] [SQL] Revert 8049 since it's pushing wrong filter down
I made a mistake in #8049 by casting literal value to attribute's data type, which would cause simply truncate the literal value and push a wrong filter down. JIRA: https://issues.apache.org/jira/browse/SPARK-9927 Author: Yijie Shen <[email protected]> Closes #8157 from yjshen/rever8049.
1 parent d7eb371 commit d0b1891

File tree

3 files changed

+3
-64
lines changed

3 files changed

+3
-64
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.datasources
2020
import org.apache.spark.{Logging, TaskContext}
2121
import org.apache.spark.deploy.SparkHadoopUtil
2222
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
23-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
23+
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2626
import org.apache.spark.sql.catalyst.plans.logical
2727
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2828
import org.apache.spark.sql.sources._
29-
import org.apache.spark.sql.types.{TimestampType, DateType, StringType, StructType}
29+
import org.apache.spark.sql.types.{StringType, StructType}
3030
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
3131
import org.apache.spark.unsafe.types.UTF8String
3232
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -343,17 +343,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
343343
* and convert them.
344344
*/
345345
protected[sql] def selectFilters(filters: Seq[Expression]) = {
346-
import CatalystTypeConverters._
347-
348346
def translate(predicate: Expression): Option[Filter] = predicate match {
349347
case expressions.EqualTo(a: Attribute, Literal(v, _)) =>
350348
Some(sources.EqualTo(a.name, v))
351349
case expressions.EqualTo(Literal(v, _), a: Attribute) =>
352350
Some(sources.EqualTo(a.name, v))
353-
case expressions.EqualTo(Cast(a: Attribute, _), l: Literal) =>
354-
Some(sources.EqualTo(a.name, convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
355-
case expressions.EqualTo(l: Literal, Cast(a: Attribute, _)) =>
356-
Some(sources.EqualTo(a.name, convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
357351

358352
case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) =>
359353
Some(sources.EqualNullSafe(a.name, v))
@@ -364,41 +358,21 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
364358
Some(sources.GreaterThan(a.name, v))
365359
case expressions.GreaterThan(Literal(v, _), a: Attribute) =>
366360
Some(sources.LessThan(a.name, v))
367-
case expressions.GreaterThan(Cast(a: Attribute, _), l: Literal) =>
368-
Some(sources.GreaterThan(a.name, convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
369-
case expressions.GreaterThan(l: Literal, Cast(a: Attribute, _)) =>
370-
Some(sources.LessThan(a.name, convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
371361

372362
case expressions.LessThan(a: Attribute, Literal(v, _)) =>
373363
Some(sources.LessThan(a.name, v))
374364
case expressions.LessThan(Literal(v, _), a: Attribute) =>
375365
Some(sources.GreaterThan(a.name, v))
376-
case expressions.LessThan(Cast(a: Attribute, _), l: Literal) =>
377-
Some(sources.LessThan(a.name, convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
378-
case expressions.LessThan(l: Literal, Cast(a: Attribute, _)) =>
379-
Some(sources.GreaterThan(a.name, convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
380366

381367
case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) =>
382368
Some(sources.GreaterThanOrEqual(a.name, v))
383369
case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) =>
384370
Some(sources.LessThanOrEqual(a.name, v))
385-
case expressions.GreaterThanOrEqual(Cast(a: Attribute, _), l: Literal) =>
386-
Some(sources.GreaterThanOrEqual(a.name,
387-
convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
388-
case expressions.GreaterThanOrEqual(l: Literal, Cast(a: Attribute, _)) =>
389-
Some(sources.LessThanOrEqual(a.name,
390-
convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
391371

392372
case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) =>
393373
Some(sources.LessThanOrEqual(a.name, v))
394374
case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) =>
395375
Some(sources.GreaterThanOrEqual(a.name, v))
396-
case expressions.LessThanOrEqual(Cast(a: Attribute, _), l: Literal) =>
397-
Some(sources.LessThanOrEqual(a.name,
398-
convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
399-
case expressions.LessThanOrEqual(l: Literal, Cast(a: Attribute, _)) =>
400-
Some(sources.GreaterThanOrEqual(a.name,
401-
convertToScala(Cast(l, a.dataType).eval(), a.dataType)))
402376

403377
case expressions.InSet(a: Attribute, set) =>
404378
Some(sources.In(a.name, set.toArray))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private[sql] class JDBCRDD(
284284
/**
285285
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
286286
*/
287-
val filterWhereClause: String = {
287+
private val filterWhereClause: String = {
288288
val filterStrings = filters map compileFilter filter (_ != null)
289289
if (filterStrings.size > 0) {
290290
val sb = new StringBuilder("WHERE ")

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import org.h2.jdbc.JdbcSQLException
2525
import org.scalatest.BeforeAndAfter
2626

2727
import org.apache.spark.SparkFunSuite
28-
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
29-
import org.apache.spark.sql.execution.PhysicalRDD
3028
import org.apache.spark.sql.types._
3129
import org.apache.spark.util.Utils
3230

@@ -150,18 +148,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
150148
|OPTIONS (url '$url', dbtable 'TEST.FLTTYPES', user 'testUser', password 'testPass')
151149
""".stripMargin.replaceAll("\n", " "))
152150

153-
conn.prepareStatement("create table test.decimals (a DECIMAL(7, 2), b DECIMAL(4, 0))").
154-
executeUpdate()
155-
conn.prepareStatement("insert into test.decimals values (12345.67, 1234)").executeUpdate()
156-
conn.prepareStatement("insert into test.decimals values (34567.89, 1428)").executeUpdate()
157-
conn.commit()
158-
sql(
159-
s"""
160-
|CREATE TEMPORARY TABLE decimals
161-
|USING org.apache.spark.sql.jdbc
162-
|OPTIONS (url '$url', dbtable 'TEST.DECIMALS', user 'testUser', password 'testPass')
163-
""".stripMargin.replaceAll("\n", " "))
164-
165151
conn.prepareStatement(
166152
s"""
167153
|create table test.nulltypes (a INT, b BOOLEAN, c TINYINT, d BINARY(20), e VARCHAR(20),
@@ -458,25 +444,4 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
458444
assert(agg.getCatalystType(0, "", 1, null) === Some(LongType))
459445
assert(agg.getCatalystType(1, "", 1, null) === Some(StringType))
460446
}
461-
462-
test("SPARK-9182: filters are not passed through to jdbc source") {
463-
def checkPushedFilter(query: String, filterStr: String): Unit = {
464-
val rddOpt = sql(query).queryExecution.executedPlan.collectFirst {
465-
case PhysicalRDD(_, rdd: JDBCRDD, _) => rdd
466-
}
467-
assert(rddOpt.isDefined)
468-
val pushedFilterStr = rddOpt.get.filterWhereClause
469-
assert(pushedFilterStr.contains(filterStr),
470-
s"Expected to push [$filterStr], actually we pushed [$pushedFilterStr]")
471-
}
472-
473-
checkPushedFilter("select * from foobar where NAME = 'fred'", "NAME = 'fred'")
474-
checkPushedFilter("select * from inttypes where A > '15'", "A > 15")
475-
checkPushedFilter("select * from inttypes where C <= 20", "C <= 20")
476-
checkPushedFilter("select * from decimals where A > 1000", "A > 1000.00")
477-
checkPushedFilter("select * from decimals where A > 1000 AND A < 2000",
478-
"A > 1000.00 AND A < 2000.00")
479-
checkPushedFilter("select * from decimals where A = 2000 AND B > 20", "A = 2000.00 AND B > 20")
480-
checkPushedFilter("select * from timetypes where B > '1998-09-10'", "B > 1998-09-10")
481-
}
482447
}

0 commit comments

Comments
 (0)