From 14aecd82dad8ad1dd9bd2b0b8135a7a172f18152 Mon Sep 17 00:00:00 2001 From: "Zhu, Lipeng" Date: Fri, 15 Mar 2019 20:21:59 -0500 Subject: [PATCH 1/6] [SPARK-27159][SQL] update mssql server dialect to support binary type ## What changes were proposed in this pull request? Change the binary type mapping from default blob to varbinary(max) for mssql server. https://docs.microsoft.com/en-us/sql/t-sql/data-types/binary-and-varbinary-transact-sql?view=sql-server-2017 ![image](https://user-images.githubusercontent.com/698621/54351715-0e8c8780-468b-11e9-8931-7ecb85c5ad6b.png) ## How was this patch tested? Unit test. Closes #24091 from lipzhu/SPARK-27159. Authored-by: Zhu, Lipeng Signed-off-by: Sean Owen --- .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 1 + .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index da787b4859a73..29500cf2afbc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -38,6 +38,7 @@ private object MsSqlServerDialect extends JdbcDialect { case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP)) case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) + case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 8e49c52c73187..66cc16d3bf597 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -860,6 +860,18 @@ class JDBCSuite extends QueryTest Some(TimestampType)) } + test("MsSqlServerDialect jdbc type mapping") { + val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") + assert(msSqlServerDialect.getJDBCType(TimestampType).map(_.databaseTypeDefinition).get == + "DATETIME") + assert(msSqlServerDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == + "NVARCHAR(MAX)") + assert(msSqlServerDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == + "BIT") + assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == + "VARBINARY(MAX)") + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") From ad967f436add69ba55da8a830e3a9aa047459966 Mon Sep 17 00:00:00 2001 From: "Zhu, Lipeng" Date: Tue, 19 Mar 2019 08:43:23 -0700 Subject: [PATCH 2/6] [SPARK-27168][SQL][TEST] Add docker integration test for MsSql server ## What changes were proposed in this pull request? This PR aims to add a JDBC integration test for MsSql server. ## How was this patch tested? ``` ./build/mvn clean install -DskipTests ./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 \ -Dtest=none -DwildcardSuites=org.apache.spark.sql.jdbc.MsSqlServerIntegrationSuite ``` Closes #24099 from lipzhu/SPARK-27168. Lead-authored-by: Zhu, Lipeng Co-authored-by: Dongjoon Hyun Co-authored-by: Lipeng Zhu Signed-off-by: Dongjoon Hyun --- external/docker-integration-tests/pom.xml | 6 + .../jdbc/MsSqlServerIntegrationSuite.scala | 205 ++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 83eae508090d0..f4fa6f778879f 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -150,5 +150,11 @@ 10.5.0.5 jar + + com.microsoft.sqlserver + mssql-jdbc + 7.2.1.jre8 + test + diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala new file mode 100644 index 0000000000000..82ce16c2b7e5a --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.math.BigDecimal +import java.sql.{Connection, Date, Timestamp} +import java.util.Properties + +import org.apache.spark.tags.DockerTest + +@DockerTest +class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { + override val db = new DatabaseOnDocker { + override val imageName = "mcr.microsoft.com/mssql/server:2017-GA-ubuntu" + override val env = Map( + "SA_PASSWORD" -> "Sapass123", + "ACCEPT_EULA" -> "Y" + ) + override val usesIpc = false + override val jdbcPort: Int = 1433 + + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;" + + override def getStartupProcessName: Option[String] = None + } + + override def dataPreparation(conn: Connection): Unit = { + conn.prepareStatement("CREATE TABLE tbl (x INT, y VARCHAR (50))").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate() + + conn.prepareStatement( + """ + |CREATE TABLE numbers ( + |a BIT, + |b TINYINT, c SMALLINT, d INT, e BIGINT, + |f FLOAT, f1 FLOAT(24), + |g REAL, + |h DECIMAL(5,2), i NUMERIC(10,5), + |j MONEY, k SMALLMONEY) + """.stripMargin).executeUpdate() + conn.prepareStatement( + """ + |INSERT INTO numbers VALUES ( + |0, + |255, 32767, 2147483647, 9223372036854775807, + |123456789012345.123456789012345, 123456789012345.123456789012345, + |123456789012345.123456789012345, + |123, 12345.12, + |922337203685477.58, 214748.3647) + """.stripMargin).executeUpdate() + + conn.prepareStatement( + """ + |CREATE TABLE dates ( + |a DATE, b DATETIME, c DATETIME2, + |d DATETIMEOFFSET, e SMALLDATETIME, + |f TIME) + """.stripMargin).executeUpdate() + conn.prepareStatement( + """ + |INSERT INTO dates VALUES ( + |'1991-11-09', '1999-01-01 13:23:35', '9999-12-31 23:59:59', + |'1901-05-09 23:59:59 +14:00', '1996-01-01 23:23:45', + |'13:31:24') + """.stripMargin).executeUpdate() + + conn.prepareStatement( + """ + |CREATE TABLE strings ( + |a CHAR(10), b VARCHAR(10), + |c NCHAR(10), d NVARCHAR(10), + |e BINARY(4), f VARBINARY(4), + |g TEXT, h NTEXT, + |i IMAGE) + """.stripMargin).executeUpdate() + conn.prepareStatement( + """ + |INSERT INTO strings VALUES ( + |'the', 'quick', + |'brown', 'fox', + |123456, 123456, + |'the', 'lazy', + |'dog') + """.stripMargin).executeUpdate() + } + + test("Basic test") { + val df = spark.read.jdbc(jdbcUrl, "tbl", new Properties) + val rows = df.collect() + assert(rows.length == 2) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 2) + assert(types(0).equals("class java.lang.Integer")) + assert(types(1).equals("class java.lang.String")) + } + + test("Numeric types") { + val df = spark.read.jdbc(jdbcUrl, "numbers", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val row = rows(0) + val types = row.toSeq.map(x => x.getClass.toString) + assert(types.length == 12) + assert(types(0).equals("class java.lang.Boolean")) + assert(types(1).equals("class java.lang.Integer")) + assert(types(2).equals("class java.lang.Integer")) + assert(types(3).equals("class java.lang.Integer")) + assert(types(4).equals("class java.lang.Long")) + assert(types(5).equals("class java.lang.Double")) + assert(types(6).equals("class java.lang.Double")) + assert(types(7).equals("class java.lang.Double")) + assert(types(8).equals("class java.math.BigDecimal")) + assert(types(9).equals("class java.math.BigDecimal")) + assert(types(10).equals("class java.math.BigDecimal")) + assert(types(11).equals("class java.math.BigDecimal")) + assert(row.getBoolean(0) == false) + assert(row.getInt(1) == 255) + assert(row.getInt(2) == 32767) + assert(row.getInt(3) == 2147483647) + assert(row.getLong(4) == 9223372036854775807L) + assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision + assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision + assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24) + assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00"))) + assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000"))) + assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800"))) + assert(row.getAs[BigDecimal](11).equals(new BigDecimal("214748.3647"))) + } + + test("Date types") { + val df = spark.read.jdbc(jdbcUrl, "dates", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val row = rows(0) + val types = row.toSeq.map(x => x.getClass.toString) + assert(types.length == 6) + assert(types(0).equals("class java.sql.Date")) + assert(types(1).equals("class java.sql.Timestamp")) + assert(types(2).equals("class java.sql.Timestamp")) + assert(types(3).equals("class java.lang.String")) + assert(types(4).equals("class java.sql.Timestamp")) + assert(types(5).equals("class java.sql.Timestamp")) + assert(row.getAs[Date](0).equals(Date.valueOf("1991-11-09"))) + assert(row.getAs[Timestamp](1).equals(Timestamp.valueOf("1999-01-01 13:23:35.0"))) + assert(row.getAs[Timestamp](2).equals(Timestamp.valueOf("9999-12-31 23:59:59.0"))) + assert(row.getString(3).equals("1901-05-09 23:59:59.0000000 +14:00")) + assert(row.getAs[Timestamp](4).equals(Timestamp.valueOf("1996-01-01 23:24:00.0"))) + assert(row.getAs[Timestamp](5).equals(Timestamp.valueOf("1900-01-01 13:31:24.0"))) + } + + test("String types") { + val df = spark.read.jdbc(jdbcUrl, "strings", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val row = rows(0) + val types = row.toSeq.map(x => x.getClass.toString) + assert(types.length == 9) + assert(types(0).equals("class java.lang.String")) + assert(types(1).equals("class java.lang.String")) + assert(types(2).equals("class java.lang.String")) + assert(types(3).equals("class java.lang.String")) + assert(types(4).equals("class [B")) + assert(types(5).equals("class [B")) + assert(types(6).equals("class java.lang.String")) + assert(types(7).equals("class java.lang.String")) + assert(types(8).equals("class [B")) + assert(row.getString(0).length == 10) + assert(row.getString(0).trim.equals("the")) + assert(row.getString(1).equals("quick")) + assert(row.getString(2).length == 10) + assert(row.getString(2).trim.equals("brown")) + assert(row.getString(3).equals("fox")) + assert(java.util.Arrays.equals(row.getAs[Array[Byte]](4), Array[Byte](0, 1, -30, 64))) + assert(java.util.Arrays.equals(row.getAs[Array[Byte]](5), Array[Byte](0, 1, -30, 64))) + assert(row.getString(6).equals("the")) + assert(row.getString(7).equals("lazy")) + assert(java.util.Arrays.equals(row.getAs[Array[Byte]](8), Array[Byte](100, 111, 103))) + } + + test("Basic write test") { + val df1 = spark.read.jdbc(jdbcUrl, "numbers", new Properties) + val df2 = spark.read.jdbc(jdbcUrl, "dates", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "strings", new Properties) + df1.write.jdbc(jdbcUrl, "numberscopy", new Properties) + df2.write.jdbc(jdbcUrl, "datescopy", new Properties) + df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) + } +} From 277fda09faaf3281c7ea9002ebda10d58a6e1d25 Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 15 Jul 2019 12:12:36 -0700 Subject: [PATCH 3/6] [SPARK-28152][SQL] Mapped ShortType to SMALLINT and FloatType to REAL for MsSqlServerDialect ## What changes were proposed in this pull request? This PR aims to correct mappings in `MsSqlServerDialect`. `ShortType` is mapped to `SMALLINT` and `FloatType` is mapped to `REAL` per [JBDC mapping]( https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types?view=sql-server-2017) respectively. ShortType and FloatTypes are not correctly mapped to right JDBC types when using JDBC connector. This results in tables and spark data frame being created with unintended types. The issue was observed when validating against SQLServer. Refer [JBDC mapping]( https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types?view=sql-server-2017 ) for guidance on mappings between SQLServer, JDBC and Java. Note that java "Short" type should be mapped to JDBC "SMALLINT" and java Float should be mapped to JDBC "REAL". Some example issue that can happen because of wrong mappings - Write from df with column type results in a SQL table of with column type as INTEGER as opposed to SMALLINT.Thus a larger table that expected. - Read results in a dataframe with type INTEGER as opposed to ShortType - ShortType has a problem in both the the write and read path - FloatTypes only have an issue with read path. In the write path Spark data type 'FloatType' is correctly mapped to JDBC equivalent data type 'Real'. But in the read path when JDBC data types need to be converted to Catalyst data types ( getCatalystType) 'Real' gets incorrectly gets mapped to 'DoubleType' rather than 'FloatType'. Refer #28151 which contained this fix as one part of a larger PR. Following PR #28151 discussion it was decided to file seperate PRs for each of the fixes. ## How was this patch tested? UnitTest added in JDBCSuite.scala and these were tested. Integration test updated and passed in MsSqlServerDialect.scala E2E test done with SQLServer Closes #25146 from shivsood/float_short_type_fix. Authored-by: shivsood Signed-off-by: Dongjoon Hyun --- .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala | 12 ++++++------ .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 7 ++++++- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++++++++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 82ce16c2b7e5a..efd7ca74c796b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -120,24 +120,24 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types.length == 12) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Integer")) - assert(types(2).equals("class java.lang.Integer")) + assert(types(2).equals("class java.lang.Short")) assert(types(3).equals("class java.lang.Integer")) assert(types(4).equals("class java.lang.Long")) assert(types(5).equals("class java.lang.Double")) - assert(types(6).equals("class java.lang.Double")) - assert(types(7).equals("class java.lang.Double")) + assert(types(6).equals("class java.lang.Float")) + assert(types(7).equals("class java.lang.Float")) assert(types(8).equals("class java.math.BigDecimal")) assert(types(9).equals("class java.math.BigDecimal")) assert(types(10).equals("class java.math.BigDecimal")) assert(types(11).equals("class java.math.BigDecimal")) assert(row.getBoolean(0) == false) assert(row.getInt(1) == 255) - assert(row.getInt(2) == 32767) + assert(row.getShort(2) == 32767) assert(row.getInt(3) == 2147483647) assert(row.getLong(4) == 9223372036854775807L) assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision - assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision - assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24) + assert(row.getFloat(6) == 1.23456788103168E14) // float(24) has 7-digits precision + assert(row.getFloat(7) == 1.23456788103168E14) // real = float(24) assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00"))) assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000"))) assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 29500cf2afbc0..805f73dee141b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -30,7 +30,11 @@ private object MsSqlServerDialect extends JdbcDialect { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Option(StringType) } else { - None + sqlType match { + case java.sql.Types.SMALLINT => Some(ShortType) + case java.sql.Types.REAL => Some(FloatType) + case _ => None + } } } @@ -39,6 +43,7 @@ private object MsSqlServerDialect extends JdbcDialect { case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 66cc16d3bf597..f4326aa6afd22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -870,6 +870,17 @@ class JDBCSuite extends QueryTest "BIT") assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "VARBINARY(MAX)") + assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == + "SMALLINT") + } + + test("SPARK-28152 MsSqlServerDialect catalyst type mapping") { + val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") + val metadata = new MetadataBuilder().putLong("scale", 1) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1, + metadata).get == ShortType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1, + metadata).get == FloatType) } test("table exists query by jdbc dialect") { From 4336d1cc64d3bc1b9afe5973249a5866608438ca Mon Sep 17 00:00:00 2001 From: "Zhu, Lipeng" Date: Fri, 15 Mar 2019 20:21:59 -0500 Subject: [PATCH 4/6] [SPARK-27159][SQL] update mssql server dialect to support binary type ## What changes were proposed in this pull request? Change the binary type mapping from default blob to varbinary(max) for mssql server. https://docs.microsoft.com/en-us/sql/t-sql/data-types/binary-and-varbinary-transact-sql?view=sql-server-2017 ![image](https://user-images.githubusercontent.com/698621/54351715-0e8c8780-468b-11e9-8931-7ecb85c5ad6b.png) ## How was this patch tested? Unit test. Closes #24091 from lipzhu/SPARK-27159. Authored-by: Zhu, Lipeng Signed-off-by: Sean Owen --- .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 1 + .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index da787b4859a73..29500cf2afbc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -38,6 +38,7 @@ private object MsSqlServerDialect extends JdbcDialect { case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP)) case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) + case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 8e49c52c73187..66cc16d3bf597 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -860,6 +860,18 @@ class JDBCSuite extends QueryTest Some(TimestampType)) } + test("MsSqlServerDialect jdbc type mapping") { + val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") + assert(msSqlServerDialect.getJDBCType(TimestampType).map(_.databaseTypeDefinition).get == + "DATETIME") + assert(msSqlServerDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == + "NVARCHAR(MAX)") + assert(msSqlServerDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == + "BIT") + assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == + "VARBINARY(MAX)") + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") From 73bb60509716a81647a43b18697c128e17b2cf6b Mon Sep 17 00:00:00 2001 From: "Zhu, Lipeng" Date: Tue, 19 Mar 2019 08:43:23 -0700 Subject: [PATCH 5/6] [SPARK-27168][SQL][TEST] Add docker integration test for MsSql server ## What changes were proposed in this pull request? This PR aims to add a JDBC integration test for MsSql server. ## How was this patch tested? ``` ./build/mvn clean install -DskipTests ./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 \ -Dtest=none -DwildcardSuites=org.apache.spark.sql.jdbc.MsSqlServerIntegrationSuite ``` Closes #24099 from lipzhu/SPARK-27168. Lead-authored-by: Zhu, Lipeng Co-authored-by: Dongjoon Hyun Co-authored-by: Lipeng Zhu Signed-off-by: Dongjoon Hyun --- external/docker-integration-tests/pom.xml | 6 + .../jdbc/MsSqlServerIntegrationSuite.scala | 205 ++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 83eae508090d0..f4fa6f778879f 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -150,5 +150,11 @@ 10.5.0.5 jar + + com.microsoft.sqlserver + mssql-jdbc + 7.2.1.jre8 + test + diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala new file mode 100644 index 0000000000000..82ce16c2b7e5a --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.math.BigDecimal +import java.sql.{Connection, Date, Timestamp} +import java.util.Properties + +import org.apache.spark.tags.DockerTest + +@DockerTest +class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { + override val db = new DatabaseOnDocker { + override val imageName = "mcr.microsoft.com/mssql/server:2017-GA-ubuntu" + override val env = Map( + "SA_PASSWORD" -> "Sapass123", + "ACCEPT_EULA" -> "Y" + ) + override val usesIpc = false + override val jdbcPort: Int = 1433 + + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;" + + override def getStartupProcessName: Option[String] = None + } + + override def dataPreparation(conn: Connection): Unit = { + conn.prepareStatement("CREATE TABLE tbl (x INT, y VARCHAR (50))").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate() + + conn.prepareStatement( + """ + |CREATE TABLE numbers ( + |a BIT, + |b TINYINT, c SMALLINT, d INT, e BIGINT, + |f FLOAT, f1 FLOAT(24), + |g REAL, + |h DECIMAL(5,2), i NUMERIC(10,5), + |j MONEY, k SMALLMONEY) + """.stripMargin).executeUpdate() + conn.prepareStatement( + """ + |INSERT INTO numbers VALUES ( + |0, + |255, 32767, 2147483647, 9223372036854775807, + |123456789012345.123456789012345, 123456789012345.123456789012345, + |123456789012345.123456789012345, + |123, 12345.12, + |922337203685477.58, 214748.3647) + """.stripMargin).executeUpdate() + + conn.prepareStatement( + """ + |CREATE TABLE dates ( + |a DATE, b DATETIME, c DATETIME2, + |d DATETIMEOFFSET, e SMALLDATETIME, + |f TIME) + """.stripMargin).executeUpdate() + conn.prepareStatement( + """ + |INSERT INTO dates VALUES ( + |'1991-11-09', '1999-01-01 13:23:35', '9999-12-31 23:59:59', + |'1901-05-09 23:59:59 +14:00', '1996-01-01 23:23:45', + |'13:31:24') + """.stripMargin).executeUpdate() + + conn.prepareStatement( + """ + |CREATE TABLE strings ( + |a CHAR(10), b VARCHAR(10), + |c NCHAR(10), d NVARCHAR(10), + |e BINARY(4), f VARBINARY(4), + |g TEXT, h NTEXT, + |i IMAGE) + """.stripMargin).executeUpdate() + conn.prepareStatement( + """ + |INSERT INTO strings VALUES ( + |'the', 'quick', + |'brown', 'fox', + |123456, 123456, + |'the', 'lazy', + |'dog') + """.stripMargin).executeUpdate() + } + + test("Basic test") { + val df = spark.read.jdbc(jdbcUrl, "tbl", new Properties) + val rows = df.collect() + assert(rows.length == 2) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 2) + assert(types(0).equals("class java.lang.Integer")) + assert(types(1).equals("class java.lang.String")) + } + + test("Numeric types") { + val df = spark.read.jdbc(jdbcUrl, "numbers", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val row = rows(0) + val types = row.toSeq.map(x => x.getClass.toString) + assert(types.length == 12) + assert(types(0).equals("class java.lang.Boolean")) + assert(types(1).equals("class java.lang.Integer")) + assert(types(2).equals("class java.lang.Integer")) + assert(types(3).equals("class java.lang.Integer")) + assert(types(4).equals("class java.lang.Long")) + assert(types(5).equals("class java.lang.Double")) + assert(types(6).equals("class java.lang.Double")) + assert(types(7).equals("class java.lang.Double")) + assert(types(8).equals("class java.math.BigDecimal")) + assert(types(9).equals("class java.math.BigDecimal")) + assert(types(10).equals("class java.math.BigDecimal")) + assert(types(11).equals("class java.math.BigDecimal")) + assert(row.getBoolean(0) == false) + assert(row.getInt(1) == 255) + assert(row.getInt(2) == 32767) + assert(row.getInt(3) == 2147483647) + assert(row.getLong(4) == 9223372036854775807L) + assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision + assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision + assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24) + assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00"))) + assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000"))) + assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800"))) + assert(row.getAs[BigDecimal](11).equals(new BigDecimal("214748.3647"))) + } + + test("Date types") { + val df = spark.read.jdbc(jdbcUrl, "dates", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val row = rows(0) + val types = row.toSeq.map(x => x.getClass.toString) + assert(types.length == 6) + assert(types(0).equals("class java.sql.Date")) + assert(types(1).equals("class java.sql.Timestamp")) + assert(types(2).equals("class java.sql.Timestamp")) + assert(types(3).equals("class java.lang.String")) + assert(types(4).equals("class java.sql.Timestamp")) + assert(types(5).equals("class java.sql.Timestamp")) + assert(row.getAs[Date](0).equals(Date.valueOf("1991-11-09"))) + assert(row.getAs[Timestamp](1).equals(Timestamp.valueOf("1999-01-01 13:23:35.0"))) + assert(row.getAs[Timestamp](2).equals(Timestamp.valueOf("9999-12-31 23:59:59.0"))) + assert(row.getString(3).equals("1901-05-09 23:59:59.0000000 +14:00")) + assert(row.getAs[Timestamp](4).equals(Timestamp.valueOf("1996-01-01 23:24:00.0"))) + assert(row.getAs[Timestamp](5).equals(Timestamp.valueOf("1900-01-01 13:31:24.0"))) + } + + test("String types") { + val df = spark.read.jdbc(jdbcUrl, "strings", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val row = rows(0) + val types = row.toSeq.map(x => x.getClass.toString) + assert(types.length == 9) + assert(types(0).equals("class java.lang.String")) + assert(types(1).equals("class java.lang.String")) + assert(types(2).equals("class java.lang.String")) + assert(types(3).equals("class java.lang.String")) + assert(types(4).equals("class [B")) + assert(types(5).equals("class [B")) + assert(types(6).equals("class java.lang.String")) + assert(types(7).equals("class java.lang.String")) + assert(types(8).equals("class [B")) + assert(row.getString(0).length == 10) + assert(row.getString(0).trim.equals("the")) + assert(row.getString(1).equals("quick")) + assert(row.getString(2).length == 10) + assert(row.getString(2).trim.equals("brown")) + assert(row.getString(3).equals("fox")) + assert(java.util.Arrays.equals(row.getAs[Array[Byte]](4), Array[Byte](0, 1, -30, 64))) + assert(java.util.Arrays.equals(row.getAs[Array[Byte]](5), Array[Byte](0, 1, -30, 64))) + assert(row.getString(6).equals("the")) + assert(row.getString(7).equals("lazy")) + assert(java.util.Arrays.equals(row.getAs[Array[Byte]](8), Array[Byte](100, 111, 103))) + } + + test("Basic write test") { + val df1 = spark.read.jdbc(jdbcUrl, "numbers", new Properties) + val df2 = spark.read.jdbc(jdbcUrl, "dates", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "strings", new Properties) + df1.write.jdbc(jdbcUrl, "numberscopy", new Properties) + df2.write.jdbc(jdbcUrl, "datescopy", new Properties) + df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) + } +} From 4f0fca0263789373d9d8f460c8ca6a3a49947d16 Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 15 Jul 2019 12:12:36 -0700 Subject: [PATCH 6/6] [SPARK-28152][SQL] Mapped ShortType to SMALLINT and FloatType to REAL for MsSqlServerDialect ## What changes were proposed in this pull request? This PR aims to correct mappings in `MsSqlServerDialect`. `ShortType` is mapped to `SMALLINT` and `FloatType` is mapped to `REAL` per [JBDC mapping]( https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types?view=sql-server-2017) respectively. ShortType and FloatTypes are not correctly mapped to right JDBC types when using JDBC connector. This results in tables and spark data frame being created with unintended types. The issue was observed when validating against SQLServer. Refer [JBDC mapping]( https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types?view=sql-server-2017 ) for guidance on mappings between SQLServer, JDBC and Java. Note that java "Short" type should be mapped to JDBC "SMALLINT" and java Float should be mapped to JDBC "REAL". Some example issue that can happen because of wrong mappings - Write from df with column type results in a SQL table of with column type as INTEGER as opposed to SMALLINT.Thus a larger table that expected. - Read results in a dataframe with type INTEGER as opposed to ShortType - ShortType has a problem in both the the write and read path - FloatTypes only have an issue with read path. In the write path Spark data type 'FloatType' is correctly mapped to JDBC equivalent data type 'Real'. But in the read path when JDBC data types need to be converted to Catalyst data types ( getCatalystType) 'Real' gets incorrectly gets mapped to 'DoubleType' rather than 'FloatType'. Refer #28151 which contained this fix as one part of a larger PR. Following PR #28151 discussion it was decided to file seperate PRs for each of the fixes. ## How was this patch tested? UnitTest added in JDBCSuite.scala and these were tested. Integration test updated and passed in MsSqlServerDialect.scala E2E test done with SQLServer Closes #25146 from shivsood/float_short_type_fix. Authored-by: shivsood Signed-off-by: Dongjoon Hyun --- .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala | 12 ++++++------ .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 7 ++++++- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++++++++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 82ce16c2b7e5a..efd7ca74c796b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -120,24 +120,24 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types.length == 12) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Integer")) - assert(types(2).equals("class java.lang.Integer")) + assert(types(2).equals("class java.lang.Short")) assert(types(3).equals("class java.lang.Integer")) assert(types(4).equals("class java.lang.Long")) assert(types(5).equals("class java.lang.Double")) - assert(types(6).equals("class java.lang.Double")) - assert(types(7).equals("class java.lang.Double")) + assert(types(6).equals("class java.lang.Float")) + assert(types(7).equals("class java.lang.Float")) assert(types(8).equals("class java.math.BigDecimal")) assert(types(9).equals("class java.math.BigDecimal")) assert(types(10).equals("class java.math.BigDecimal")) assert(types(11).equals("class java.math.BigDecimal")) assert(row.getBoolean(0) == false) assert(row.getInt(1) == 255) - assert(row.getInt(2) == 32767) + assert(row.getShort(2) == 32767) assert(row.getInt(3) == 2147483647) assert(row.getLong(4) == 9223372036854775807L) assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision - assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision - assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24) + assert(row.getFloat(6) == 1.23456788103168E14) // float(24) has 7-digits precision + assert(row.getFloat(7) == 1.23456788103168E14) // real = float(24) assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00"))) assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000"))) assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 29500cf2afbc0..805f73dee141b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -30,7 +30,11 @@ private object MsSqlServerDialect extends JdbcDialect { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Option(StringType) } else { - None + sqlType match { + case java.sql.Types.SMALLINT => Some(ShortType) + case java.sql.Types.REAL => Some(FloatType) + case _ => None + } } } @@ -39,6 +43,7 @@ private object MsSqlServerDialect extends JdbcDialect { case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 66cc16d3bf597..f4326aa6afd22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -870,6 +870,17 @@ class JDBCSuite extends QueryTest "BIT") assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "VARBINARY(MAX)") + assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == + "SMALLINT") + } + + test("SPARK-28152 MsSqlServerDialect catalyst type mapping") { + val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") + val metadata = new MetadataBuilder().putLong("scale", 1) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1, + metadata).get == ShortType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1, + metadata).get == FloatType) } test("table exists query by jdbc dialect") {