From 30a01c9ec2f44339511cf3fb816d91650e0f7ebb Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 18 Dec 2015 14:21:57 +0900 Subject: [PATCH 1/5] Add tests in JDBCSuite --- .../execution/datasources/jdbc/JDBCRDD.scala | 2 ++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 ++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 2d38562e0901..24710264c401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -381,6 +381,8 @@ private[sql] class JDBCRDD( val myWhereClause = getWhereClause(part) val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause" + logDebug(s"'${sqlText}' input into JDBC") + val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) val fetchSize = properties.getProperty("fetchsize", "0").toInt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 2b91f62c2fa2..6da5e0717765 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -23,13 +23,19 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester +import org.apache.spark.Partition import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils -class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { +class JDBCSuite extends SparkFunSuite + with BeforeAndAfter with PrivateMethodTester with SharedSQLContext +{ import testImplicits._ val url = "jdbc:h2:mem:testdb0" @@ -429,6 +435,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } + test("compile filters") { + val jdbcRdd = JDBCRDD.scanTable( + null, null, "", "", null, "", Array.empty[String], Array.empty[Filter], Array.empty[Partition]) + val compileFilter = PrivateMethod[Unit]('compileFilter) + def doCompileFilter(f: Filter) = jdbcRdd invokePrivate compileFilter(f) + + assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != abc") + assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") + assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") + assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") + assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(IsNull("col1")) === "col0 IS NULL") + assert(doCompileFilter(IsNotNull("col1")) === "col0 IS NOT NULL") + } + test("Dialect unregister") { JdbcDialects.registerDialect(testH2Dialect) JdbcDialects.unregisterDialect(testH2Dialect) From ed94623cb01e36e790824903b9e937495cae3942 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Mon, 21 Dec 2015 14:34:45 +0900 Subject: [PATCH 2/5] fix minor bugs --- .../execution/datasources/jdbc/JDBCRDD.scala | 59 ++++++++++--------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 19 +++--- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 24710264c401..e02879cbcaa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -167,6 +167,35 @@ private[sql] object JDBCRDD extends Logging { new StructType(columns map { name => fieldMap(name) }) } + /** + * Converts value to SQL expression. + */ + private def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case timestampValue: Timestamp => "'" + timestampValue + "'" + case dateValue: Date => "'" + dateValue + "'" + case _ => value + } + + private def escapeSql(value: String): String = + if (value == null) null else StringUtils.replace(value, "'", "''") + + /** + * Turns a single Filter into a String representing a SQL expression. + * Returns null for an unhandled filter. + */ + private def compileFilter(f: Filter): String = f match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case _ => null + } + /** * Given a driver string and an url, return a function that loads the * specified driver string then returns a connection to the JDBC url. @@ -262,40 +291,12 @@ private[sql] class JDBCRDD( if (sb.length == 0) "1" else sb.substring(1) } - /** - * Converts value to SQL expression. - */ - private def compileValue(value: Any): Any = value match { - case stringValue: String => s"'${escapeSql(stringValue)}'" - case timestampValue: Timestamp => "'" + timestampValue + "'" - case dateValue: Date => "'" + dateValue + "'" - case _ => value - } - - private def escapeSql(value: String): String = - if (value == null) null else StringUtils.replace(value, "'", "''") - - /** - * Turns a single Filter into a String representing a SQL expression. - * Returns null for an unhandled filter. - */ - private def compileFilter(f: Filter): String = f match { - case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" - case LessThan(attr, value) => s"$attr < ${compileValue(value)}" - case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" - case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" - case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" - case IsNull(attr) => s"$attr IS NULL" - case IsNotNull(attr) => s"$attr IS NOT NULL" - case _ => null - } /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = { - val filterStrings = filters map compileFilter filter (_ != null) + val filterStrings = filters map JDBCRDD.compileFilter filter (_ != null) if (filterStrings.size > 0) { val sb = new StringBuilder("WHERE ") filterStrings.foreach(x => sb.append(x).append(" AND ")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 6da5e0717765..036153f971d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.DriverManager +import java.sql.{Date, DriverManager, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester -import org.apache.spark.Partition import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext @@ -436,19 +435,19 @@ class JDBCSuite extends SparkFunSuite } test("compile filters") { - val jdbcRdd = JDBCRDD.scanTable( - null, null, "", "", null, "", Array.empty[String], Array.empty[Filter], Array.empty[Partition]) - val compileFilter = PrivateMethod[Unit]('compileFilter) - def doCompileFilter(f: Filter) = jdbcRdd invokePrivate compileFilter(f) - + val compileFilter = PrivateMethod[String]('compileFilter) + def doCompileFilter(f: Filter) = JDBCRDD invokePrivate compileFilter(f) assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") - assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != abc") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") + assert(doCompileFilter(LessThan("col3", + Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") + assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'") assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") - assert(doCompileFilter(IsNull("col1")) === "col0 IS NULL") - assert(doCompileFilter(IsNotNull("col1")) === "col0 IS NOT NULL") + assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") + assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") } test("Dialect unregister") { From 9932f066bf64163cef4201257f3813a085863785 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 09:34:14 +0900 Subject: [PATCH 3/5] Fix style errors --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 036153f971d3..8a373cdf217e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -436,7 +436,7 @@ class JDBCSuite extends SparkFunSuite test("compile filters") { val compileFilter = PrivateMethod[String]('compileFilter) - def doCompileFilter(f: Filter) = JDBCRDD invokePrivate compileFilter(f) + def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") From f618148bcb567ef28ec349412b6faebc87623e6e Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 15:56:12 +0900 Subject: [PATCH 4/5] Fix format errors --- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 6 +++--- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index e02879cbcaa1..528087716b8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -163,8 +163,8 @@ private[sql] object JDBCRDD extends Logging { * @return A Catalyst schema corresponding to columns in the given order. */ private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { - val fieldMap = Map(schema.fields map { x => x.metadata.getString("name") -> x }: _*) - new StructType(columns map { name => fieldMap(name) }) + val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) + new StructType(columns.map(name => fieldMap(name))) } /** @@ -296,7 +296,7 @@ private[sql] class JDBCRDD( * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = { - val filterStrings = filters map JDBCRDD.compileFilter filter (_ != null) + val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null) if (filterStrings.size > 0) { val sb = new StringBuilder("WHERE ") filterStrings.foreach(x => sb.append(x).append(" AND ")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 8a373cdf217e..7975c5df6c0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils class JDBCSuite extends SparkFunSuite - with BeforeAndAfter with PrivateMethodTester with SharedSQLContext -{ + with BeforeAndAfter with PrivateMethodTester with SharedSQLContext { import testImplicits._ val url = "jdbc:h2:mem:testdb0" From 46c0e7f659c815b8510de13e03be48eb13acdf00 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 16:02:47 +0900 Subject: [PATCH 5/5] Remove unnecessary logging --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 528087716b8f..fc0f86cb1813 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -382,8 +382,6 @@ private[sql] class JDBCRDD( val myWhereClause = getWhereClause(part) val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause" - logDebug(s"'${sqlText}' input into JDBC") - val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) val fetchSize = properties.getProperty("fetchsize", "0").toInt