From da136edbd44506aca6cec6bacb20b46af46fc23a Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 3 May 2019 11:15:07 -0700 Subject: [PATCH] [SPARK-27596] The JDBC 'query' option doesn't work for Oracle database --- .../spark/sql/jdbc/DB2IntegrationSuite.scala | 26 +++++++++++++++++ .../sql/jdbc/MySQLIntegrationSuite.scala | 27 ++++++++++++++++++ .../sql/jdbc/OracleIntegrationSuite.scala | 28 +++++++++++++++++++ .../sql/jdbc/PostgresIntegrationSuite.scala | 26 +++++++++++++++++ .../datasources/jdbc/JDBCOptions.scala | 2 +- 5 files changed, 108 insertions(+), 1 deletion(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala index f5930bc281e8..32e56f03ee52 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -158,4 +158,30 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getInt(1) == 20) assert(rows(0).getString(2) == "1") } + + test("query JDBC option") { + val expectedResult = Set( + (42, "fred"), + (17, "dave") + ).map { case (x, y) => + Row(Integer.valueOf(x), String.valueOf(y)) + } + + val query = "SELECT x, y FROM tbl WHERE x > 10" + // query option to pass on the query string. + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", query) + .load() + assert(df.collect.toSet === expectedResult) + + // query option in the create table path. + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', query '$query') + """.stripMargin.replaceAll("\n", " ")) + assert(sql("select x, y from queryOption").collect.toSet == expectedResult) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index a70ed98b52d5..9cd5c4ec41a5 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -21,6 +21,7 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties +import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.tags.DockerTest @DockerTest @@ -152,4 +153,30 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { df2.write.jdbc(jdbcUrl, "datescopy", new Properties) df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) } + + test("query JDBC option") { + val expectedResult = Set( + (42, "fred"), + (17, "dave") + ).map { case (x, y) => + Row(Integer.valueOf(x), String.valueOf(y)) + } + + val query = "SELECT x, y FROM tbl WHERE x > 10" + // query option to pass on the query string. + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", query) + .load() + assert(df.collect.toSet === expectedResult) + + // query option in the create table path. + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', query '$query') + """.stripMargin.replaceAll("\n", " ")) + assert(sql("select x, y from queryOption").collect.toSet == expectedResult) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index c9a541783679..64b9837cc5fa 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -485,4 +485,32 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo } assert(df2.collect.toSet === expectedResult) } + + test("query JDBC option") { + val expectedResult = Set( + (1, "1991-11-09", "1996-01-01 01:23:45") + ).map { case (id, date, timestamp) => + Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp)) + } + + val query = "SELECT id, d, t FROM datetime WHERE id = 1" + // query option to pass on the query string. + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", query) + .option("oracle.jdbc.mapDateToTimestamp", "false") + .load() + assert(df.collect.toSet === expectedResult) + + // query option in the create table path. + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', + | query '$query', + | oracle.jdbc.mapDateToTimestamp false) + """.stripMargin.replaceAll("\n", " ")) + assert(sql("select id, d, t from queryOption").collect.toSet == expectedResult) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index e8d5b468df63..7caf3d6ba59f 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -21,6 +21,7 @@ import java.sql.Connection import java.util.Properties import org.apache.spark.sql.Column +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType} import org.apache.spark.tags.DockerTest @@ -180,4 +181,29 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getSeq(8) == Seq("""{"a": "foo", "b": "bar"}""", """{"a": 1, "b": 2}""")) assert(rows(0).getSeq(9) == Seq("""{"a": 1, "b": 2, "c": 3}""")) } + + test("query JDBC option") { + val expectedResult = Set( + (42, 123456789012345L) + ).map { case (c1, c3) => + Row(Integer.valueOf(c1), java.lang.Long.valueOf(c3)) + } + + val query = "SELECT c1, c3 FROM bar WHERE c1 IS NOT NULL" + // query option to pass on the query string. + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", query) + .load() + assert(df.collect.toSet === expectedResult) + + // query option in the create table path. + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', query '$query') + """.stripMargin.replaceAll("\n", " ")) + assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index b4469cb538fa..d184f3cb71b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -87,7 +87,7 @@ class JDBCOptions( if (subquery.isEmpty) { throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.") } else { - s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}" + s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" } }