diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index fad482b42735..2d77c7292bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -165,8 +165,64 @@ private[sql] object JDBCRDD extends Logging { * @return A Catalyst schema corresponding to columns in the given order. */ private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { - val fieldMap = Map(schema.fields map { x => x.metadata.getString("name") -> x }: _*) - new StructType(columns map { name => fieldMap(name) }) + val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) + new StructType(columns.map(name => fieldMap(name))) + } + + /** + * Converts value to SQL expression. + */ + private def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case timestampValue: Timestamp => "'" + timestampValue + "'" + case dateValue: Date => "'" + dateValue + "'" + case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") + case _ => value + } + + private def escapeSql(value: String): String = + if (value == null) null else StringUtils.replace(value, "'", "''") + + /** + * Turns a single Filter into a String representing a SQL expression. + * Returns None for an unhandled filter. + */ + private[jdbc] def compileFilter(f: Filter): Option[String] = { + Option(f match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case EqualNullSafe(attr, value) => + s"(NOT ($attr != ${compileValue(value)} OR $attr IS NULL OR " + + s"${compileValue(value)} IS NULL) OR ($attr IS NULL AND ${compileValue(value)} IS NULL))" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" + case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" + case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" + case In(attr, value) => s"$attr IN (${compileValue(value)})" + case Not(f) => compileFilter(f).map(p => s"(NOT ($p))").getOrElse(null) + case Or(f1, f2) => + // We can't compile Or filter unless both sub-filters are compiled successfully. + // It applies too for the following And filter. + // If we can make sure compileFilter supports all filters, we can remove this check. + val or = Seq(f1, f2).map(compileFilter(_)).flatten + if (or.size == 2) { + or.map(p => s"($p)").mkString(" OR ") + } else { + null + } + case And(f1, f2) => + val and = Seq(f1, f2).map(compileFilter(_)).flatten + if (and.size == 2) { + and.map(p => s"($p)").mkString(" AND ") + } else { + null + } + case _ => null + }) } @@ -240,54 +296,24 @@ private[sql] class JDBCRDD( if (sb.length == 0) "1" else sb.substring(1) } - /** - * Converts value to SQL expression. - */ - private def compileValue(value: Any): Any = value match { - case stringValue: String => s"'${escapeSql(stringValue)}'" - case timestampValue: Timestamp => "'" + timestampValue + "'" - case dateValue: Date => "'" + dateValue + "'" - case _ => value - } - - private def escapeSql(value: String): String = - if (value == null) null else StringUtils.replace(value, "'", "''") - - /** - * Turns a single Filter into a String representing a SQL expression. - * Returns null for an unhandled filter. - */ - private def compileFilter(f: Filter): String = f match { - case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case LessThan(attr, value) => s"$attr < ${compileValue(value)}" - case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" - case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" - case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" - case _ => null - } - /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ - private val filterWhereClause: String = { - val filterStrings = filters map compileFilter filter (_ != null) - if (filterStrings.size > 0) { - val sb = new StringBuilder("WHERE ") - filterStrings.foreach(x => sb.append(x).append(" AND ")) - sb.substring(0, sb.length - 5) - } else "" - } + private val filterWhereClause: String = + filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ") /** * A WHERE clause representing both `filters`, if any, and the current partition. */ private def getWhereClause(part: JDBCPartition): String = { if (part.whereClause != null && filterWhereClause.length > 0) { - filterWhereClause + " AND " + part.whereClause + "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})" } else if (part.whereClause != null) { "WHERE " + part.whereClause + } else if (filterWhereClause.length > 0) { + "WHERE " + filterWhereClause } else { - filterWhereClause + "" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 375266f1be42..07c3a107e6bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + // Check if JDBCRDD.compileFilter can accept input filters + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + filters.filter(JDBCRDD.compileFilter(_).isEmpty) + } + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 4b389add63c4..ce0efa10c403 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -18,18 +18,25 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.DriverManager +import java.sql.{Date, DriverManager, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils -class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { +class JDBCSuite extends SparkFunSuite + with BeforeAndAfter with PrivateMethodTester with SharedSQLContext { import testImplicits._ val url = "jdbc:h2:mem:testdb0" @@ -176,12 +183,51 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext } test("SELECT * WHERE (simple predicates)") { - assert(sql("SELECT * FROM foobar WHERE THEID < 1").collect().size === 0) - assert(sql("SELECT * FROM foobar WHERE THEID != 2").collect().size === 2) - assert(sql("SELECT * FROM foobar WHERE THEID = 1").collect().size === 1) - assert(sql("SELECT * FROM foobar WHERE NAME = 'fred'").collect().size === 1) - assert(sql("SELECT * FROM foobar WHERE NAME > 'fred'").collect().size === 2) - assert(sql("SELECT * FROM foobar WHERE NAME != 'fred'").collect().size === 2) + def checkPushdown(df: DataFrame): DataFrame = { + val parentPlan = df.queryExecution.executedPlan + // Check if SparkPlan Filter is removed in a physical plan and + // the plan only has PhysicalRDD to scan JDBCRelation. + assert(parentPlan.isInstanceOf[PhysicalRDD]) + assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + df + } + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + .collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + .collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + .collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + + "AND THEID = 2")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + + // This is a test to reflect discussion in SPARK-12218. + // The older versions of spark have this kind of bugs in parquet data source. + val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") + val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") + assert(df1.collect.toSet === Set(Row("mary", 2))) + assert(df2.collect.toSet === Set(Row("mary", 2))) + } + + test("SELECT COUNT(1) WHERE (predicates)") { + // Check if an answer is correct when Filter is removed from operations such as count() which + // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in + // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters + // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD + // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more + // discussions. + assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1))) } test("SELECT * WHERE (quoted strings)") { @@ -427,6 +473,32 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } + test("compile filters") { + val compileFilter = PrivateMethod[Option[String]]('compileFilter) + def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("") + assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") + assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) + === "(col0 = 0) AND (col1 = 'def')") + assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) + === "(col0 = 2) OR (col1 = 'ghi')") + assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") + assert(doCompileFilter(LessThan("col3", + Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") + assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'") + assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") + assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") + assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')") + assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) + === "(NOT (col1 IN ('mno', 'pqr')))") + assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") + assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") + assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) + === "((NOT (col0 != 'abc' OR col0 IS NULL OR 'abc' IS NULL) " + + "OR (col0 IS NULL AND 'abc' IS NULL))) AND (col1 = 'def')") + } + test("Dialect unregister") { JdbcDialects.registerDialect(testH2Dialect) JdbcDialects.unregisterDialect(testH2Dialect) @@ -507,4 +579,30 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(oracleDialect.getJDBCType(StringType). map(_.databaseTypeDefinition).get == "VARCHAR2(255)") } + + private def assertEmptyQuery(sqlString: String): Unit = { + assert(sql(sqlString).collect().isEmpty) + } + + test("SPARK-15916: JDBC filter operator push down should respect operator precedence") { + val TRUE = "NAME != 'non_exists'" + val FALSE1 = "THEID > 1000000000" + val FALSE2 = "THEID < -1000000000" + + assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2") + assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)") + + // Tests JDBCPartition whereClause clause push down. + withTempTable("tempFrame") { + val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE" + val df = sqlContext.read.jdbc( + urlWithUserAndPass, + "TEST.PEOPLE", + predicates = Array[String](jdbcPartitionWhereClause), + new Properties) + + df.registerTempTable("tempFrame") + assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2") + } + } }