From 1e3adbc728b03b50850d461da6d4d7e7d00b8e37 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 24 Dec 2015 17:27:27 +0800 Subject: [PATCH 1/4] Refactor filter pushdown for JDBCRDD and add few filters. --- .../execution/datasources/jdbc/JDBCRDD.scala | 61 ++++++++++++------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 32 +++++++++- 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index fc0f86cb1813..be703828d917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -174,6 +174,7 @@ private[sql] object JDBCRDD extends Logging { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" + case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } @@ -184,16 +185,38 @@ private[sql] object JDBCRDD extends Logging { * Turns a single Filter into a String representing a SQL expression. * Returns null for an unhandled filter. */ - private def compileFilter(f: Filter): String = f match { - case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" - case LessThan(attr, value) => s"$attr < ${compileValue(value)}" - case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" - case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" - case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" - case IsNull(attr) => s"$attr IS NULL" - case IsNotNull(attr) => s"$attr IS NOT NULL" - case _ => null + private def compileFilter(f: Filter): Option[String] = { + Some(f match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" + case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" + case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" + case In(attr, value) => s"$attr IN (${compileValue(value)})" + case Not(f) => compileFilter(f).map(p => s"(NOT ($p))").getOrElse(null) + case Or(f1, f2) => + // We can't compile Or filter unless both sub-filters are compiled successfully. + // It applies too for the following And filter. + val or = Seq(f1, f2).map(compileFilter(_)).flatten + if (or.size == 2) { + or.map(p => s"($p)").mkString(" OR ") + } else { + null + } + case And(f1, f2) => + val and = Seq(f1, f2).map(compileFilter(_)).flatten + if (and.size == 2) { + and.map(p => s"($p)").mkString(" AND ") + } else { + null + } + case _ => null + }) } /** @@ -295,25 +318,17 @@ private[sql] class JDBCRDD( /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ - private val filterWhereClause: String = { - val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null) - if (filterStrings.size > 0) { - val sb = new StringBuilder("WHERE ") - filterStrings.foreach(x => sb.append(x).append(" AND ")) - sb.substring(0, sb.length - 5) - } else "" - } + private val filterWhereClause: Seq[Option[String]] = filters.map(JDBCRDD.compileFilter) /** * A WHERE clause representing both `filters`, if any, and the current partition. */ private def getWhereClause(part: JDBCPartition): String = { - if (part.whereClause != null && filterWhereClause.length > 0) { - filterWhereClause + " AND " + part.whereClause - } else if (part.whereClause != null) { - "WHERE " + part.whereClause + val w = filterWhereClause.+:(Option(part.whereClause)).flatten.mkString(" AND ") + if (!w.isEmpty) { + "WHERE " + w } else { - filterWhereClause + w } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7975c5df6c0b..23a876f165cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -186,8 +187,26 @@ class JDBCSuite extends SparkFunSuite assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + .collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + .collect().size === 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + .collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + + "AND THEID = 2")).collect().size == 2) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + + // This is a test to reflect discussion in SPARK-12218. + // The older versions of spark have this kind of bugs in parquet data source. + val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") + val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") + assert(df1.collect.toSet === Set(Row("mary", 2))) + assert(df2.collect.toSet === Set(Row("mary", 2))) } test("SELECT * WHERE (quoted strings)") { @@ -434,10 +453,14 @@ class JDBCSuite extends SparkFunSuite } test("compile filters") { - val compileFilter = PrivateMethod[String]('compileFilter) - def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) + val compileFilter = PrivateMethod[Option[String]]('compileFilter) + def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("") assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") - assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") + assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) + === "(col0 = 0) AND (col1 = 'def')") + assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) + === "(col0 = 2) OR (col1 = 'ghi')") assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") assert(doCompileFilter(LessThan("col3", Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") @@ -445,6 +468,9 @@ class JDBCSuite extends SparkFunSuite assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')") + assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) + === "(NOT (col1 IN ('mno', 'pqr')))") assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") } From 05188b73d3b80023749cf281f0ede194108baa33 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 24 Dec 2015 17:39:00 +0800 Subject: [PATCH 2/4] Use Option instead of Some. --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index be703828d917..6f91f91ebbaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -186,7 +186,7 @@ private[sql] object JDBCRDD extends Logging { * Returns null for an unhandled filter. */ private def compileFilter(f: Filter): Option[String] = { - Some(f match { + Option(f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" case LessThan(attr, value) => s"$attr < ${compileValue(value)}" case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" From b0da6fc402ad001f1a8a20dcf882fedb880b3f75 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 29 Dec 2015 16:41:32 +0800 Subject: [PATCH 3/4] Update method comment. --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 6f91f91ebbaa..1b801e3c2a00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -183,7 +183,7 @@ private[sql] object JDBCRDD extends Logging { /** * Turns a single Filter into a String representing a SQL expression. - * Returns null for an unhandled filter. + * Returns None for an unhandled filter. */ private def compileFilter(f: Filter): Option[String] = { Option(f match { From a7ef79e7b51aeac2d0cae47914654dc42a64492a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 30 Dec 2015 12:18:38 +0800 Subject: [PATCH 4/4] For comments. --- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 14 +++++++++----- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 3 ++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 1b801e3c2a00..aa34224ea4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -318,17 +318,21 @@ private[sql] class JDBCRDD( /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ - private val filterWhereClause: Seq[Option[String]] = filters.map(JDBCRDD.compileFilter) + private val filterWhereClause: String = + filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ") /** * A WHERE clause representing both `filters`, if any, and the current partition. */ private def getWhereClause(part: JDBCPartition): String = { - val w = filterWhereClause.+:(Option(part.whereClause)).flatten.mkString(" AND ") - if (!w.isEmpty) { - "WHERE " + w + if (part.whereClause != null && filterWhereClause.length > 0) { + "WHERE " + filterWhereClause + " AND " + part.whereClause + } else if (part.whereClause != null) { + "WHERE " + part.whereClause + } else if (filterWhereClause.length > 0) { + "WHERE " + filterWhereClause } else { - w + "" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 23a876f165cc..3cd48175a1cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -190,7 +190,7 @@ class JDBCSuite extends SparkFunSuite assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) - .collect().size === 2) + .collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " @@ -473,6 +473,7 @@ class JDBCSuite extends SparkFunSuite === "(NOT (col1 IN ('mno', 'pqr')))") assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") + assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) === "") } test("Dialect unregister") {