From 39d22d5aea2508f412dc26b0cdd1e7d130f94e09 Mon Sep 17 00:00:00 2001 From: shivsood Date: Fri, 21 Jun 2019 14:44:24 -0700 Subject: [PATCH 1/6] Fix for ByteType support while writing and reading with JDBC based data sources.Relevant unit tests are added. Issues: Writing dataframe with column type BYTETYPE fails when using JDBC connector for SQL Server. Append and Read of tables also fail. The problem is due to 1. (Write path) Incorrect mapping of BYTETYPE in getCommonJDBCType() in jdbcutils.scala where BYTETYPE gets mapped to BYTE text. It should be mapped to TINYINT case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT)) In getCatalystType() ( JDBC to Catalyst type mapping) TINYINT is mapped to INTEGER, while it should be mapped to BYTETYPE. Mapping to integer is ok from the point of view of upcasting, but will lead to 4 byte allocation rather than 1 byte for BYTETYPE. 2. (Read path) Read path ends up calling makeGetter(dt: DataType, metadata: Metadata). The function sets the value in RDD row. The value is set per the data type. Here there is no mapping for BYTETYPE and thus results will result in an error when getCatalystType() is fixed. Note : These issues were found when reading/writing with SQLServer. Error seen when writing table (JDBC Write failed,com.microsoft.sqlserver.jdbc.SQLServerException: Column, parameter, or variable #2: Cannot find data type BYTE.) com.microsoft.sqlserver.jdbc.SQLServerException: Column, parameter, or variable #2: Cannot find data type BYTE. com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:254) com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1608) com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:859) .. --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 5 +++++ .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 7 ++++++- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 86a27b5afc250..6b7168d3b3cc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -439,6 +439,10 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setLong(pos, rs.getLong(pos + 1)) + case ByteType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setByte(pos, rs.getByte(pos + 1)) + case ShortType => (rs: ResultSet, row: InternalRow, pos: Int) => row.setShort(pos, rs.getShort(pos + 1)) @@ -797,6 +801,7 @@ object JdbcUtils extends Logging { tableSchema: StructType, customSchema: String, nameEquality: Resolver): StructType = { + if (null != customSchema && customSchema.nonEmpty) { val userSchema = CatalystSqlParser.parseTableSchema(customSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 29500cf2afbc0..19789722ed06d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -30,7 +30,11 @@ private object MsSqlServerDialect extends JdbcDialect { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Option(StringType) } else { - None + val answer = sqlType match { + case java.sql.Types.TINYINT => Option(ByteType) + case _ => None + } + answer } } @@ -39,6 +43,7 @@ private object MsSqlServerDialect extends JdbcDialect { case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) + case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5f27e75addcff..fc4df1eeb5bb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -895,6 +895,17 @@ class JDBCSuite extends QueryTest "BIT") assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "VARBINARY(MAX)") + assert(msSqlServerDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == + "TINYINT") + } + + test("MsSqlServerDialect catalyst type mapping") { + val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") + val metadata = new MetadataBuilder().putLong("scale", 1) + + assert(msSqlServerDialect.getCatalystType(java.sql.Types.TINYINT,"TINYINT",1,metadata).get == + ByteType) + } test("table exists query by jdbc dialect") { From 3edc863c087721f15f764d119e4be3435af7a738 Mon Sep 17 00:00:00 2001 From: shivsood Date: Tue, 25 Jun 2019 13:08:56 -0700 Subject: [PATCH 2/6] Fix for ShortType and FloatType when appending to SQLServerTables ShortType and FloatTypes are not correctly mapped to right JDBC types when using JDBC connector. This results in tables and spark data frame being created with unintended types. Some example issue Write from df with column type results in a SQL table of with column type as INTEGER as opposed to SMALLINT. Thus a larger table that expected. Read results in a dataframe with type INTEGER as opposed to ShortType FloatTypes have a issue with read path. In the write path Spark data type 'FloatType' is correctly mapped to JDBC equivalent data type 'Real'. But in the read path when JDBC data types need to be converted to Catalyst data types ( getCatalystType) 'Real' gets incorrectly gets mapped to 'DoubleType' rather than 'FloatType'. --- .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 5 ++++- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 19789722ed06d..65a41870896e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -31,7 +31,9 @@ private object MsSqlServerDialect extends JdbcDialect { Option(StringType) } else { val answer = sqlType match { - case java.sql.Types.TINYINT => Option(ByteType) + case java.sql.Types.TINYINT => Option(ByteType) + case java.sql.Types.SMALLINT => Option(ShortType) + case java.sql.Types.REAL => Option(FloatType) case _ => None } answer @@ -44,6 +46,7 @@ private object MsSqlServerDialect extends JdbcDialect { case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT)) + case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index fc4df1eeb5bb3..359b164d1f4a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -897,6 +897,9 @@ class JDBCSuite extends QueryTest "VARBINARY(MAX)") assert(msSqlServerDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "TINYINT") + + assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == + "SMALLINT") } test("MsSqlServerDialect catalyst type mapping") { @@ -905,6 +908,10 @@ class JDBCSuite extends QueryTest assert(msSqlServerDialect.getCatalystType(java.sql.Types.TINYINT,"TINYINT",1,metadata).get == ByteType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT,"SMALLINT",1,metadata).get == + ShortType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL,"REAL",1,metadata).get == + FloatType) } From b874527432c3b56d67b217127c0a0a77f80e0ea5 Mon Sep 17 00:00:00 2001 From: shivsood Date: Wed, 3 Jul 2019 16:13:39 -0700 Subject: [PATCH 3/6] Addressed Review comments on PR. 1. Fixed test cases related to TINYINT and FLOAT in MsSqlServerIntegrationSuite.scala. 2. code cleanup in MsSqlServerDialect.scala 3. Fixed issue in JdbcUtils where tinyInt was being written as Int and failing the 'basic write' test cases in IntegrationSuite. --- .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 18 ++++++++++-------- .../execution/datasources/jdbc/JdbcUtils.scala | 2 +- .../spark/sql/jdbc/MsSqlServerDialect.scala | 15 +++++++-------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 82ce16c2b7e5a..84b02e66822be 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -118,26 +118,28 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { val row = rows(0) val types = row.toSeq.map(x => x.getClass.toString) assert(types.length == 12) + assert(types(0).equals("class java.lang.Boolean")) - assert(types(1).equals("class java.lang.Integer")) - assert(types(2).equals("class java.lang.Integer")) + assert(types(1).equals("class java.lang.Byte")) + assert(types(2).equals("class java.lang.Short")) assert(types(3).equals("class java.lang.Integer")) assert(types(4).equals("class java.lang.Long")) assert(types(5).equals("class java.lang.Double")) - assert(types(6).equals("class java.lang.Double")) - assert(types(7).equals("class java.lang.Double")) + assert(types(6).equals("class java.lang.Float")) + assert(types(7).equals("class java.lang.Float")) assert(types(8).equals("class java.math.BigDecimal")) assert(types(9).equals("class java.math.BigDecimal")) assert(types(10).equals("class java.math.BigDecimal")) assert(types(11).equals("class java.math.BigDecimal")) assert(row.getBoolean(0) == false) - assert(row.getInt(1) == 255) - assert(row.getInt(2) == 32767) + + assert(row.getByte(1) == 255.toByte) + assert(row.getShort(2) == 32767) assert(row.getInt(3) == 2147483647) assert(row.getLong(4) == 9223372036854775807L) assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision - assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision - assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24) + assert(row.getFloat(6) == 1.23456788103168E14) // float(24) has 7-digits precision + assert(row.getFloat(7) == 1.23456788103168E14) // real = float(24) assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00"))) assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000"))) assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 6b7168d3b3cc0..4a9b69ebbf6e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -554,7 +554,7 @@ object JdbcUtils extends Logging { case ByteType => (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setInt(pos + 1, row.getByte(pos)) + stmt.setByte(pos + 1, row.getByte(pos)) case BooleanType => (stmt: PreparedStatement, row: Row, pos: Int) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 65a41870896e4..e22f48fbcb095 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -30,13 +30,12 @@ private object MsSqlServerDialect extends JdbcDialect { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Option(StringType) } else { - val answer = sqlType match { - case java.sql.Types.TINYINT => Option(ByteType) - case java.sql.Types.SMALLINT => Option(ShortType) - case java.sql.Types.REAL => Option(FloatType) - case _ => None + sqlType match { + case java.sql.Types.TINYINT => Some(ByteType) + case java.sql.Types.SMALLINT => Some(ShortType) + case java.sql.Types.REAL => Some(FloatType) + case _ => None } - answer } } @@ -45,8 +44,8 @@ private object MsSqlServerDialect extends JdbcDialect { case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) - case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT)) - case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) case _ => None } From 8a7d960833799ba4a20e90b21e564b1d8a61f424 Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 8 Jul 2019 10:33:38 -0700 Subject: [PATCH 4/6] fixed styling issues from dev/stylecheck run --- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 359b164d1f4a9..cc69161c6f25b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -906,12 +906,12 @@ class JDBCSuite extends QueryTest val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") val metadata = new MetadataBuilder().putLong("scale", 1) - assert(msSqlServerDialect.getCatalystType(java.sql.Types.TINYINT,"TINYINT",1,metadata).get == - ByteType) - assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT,"SMALLINT",1,metadata).get == - ShortType) - assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL,"REAL",1,metadata).get == - FloatType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, + metadata).get == ByteType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1, + metadata).get == ShortType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1, + metadata).get == FloatType) } From 6bf6e3df4892ee319470b63ab905c6cf04624ba6 Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 8 Jul 2019 13:24:23 -0700 Subject: [PATCH 5/6] Fixing issues in orignal commit. Change was not in the right place. Should have seen this as a build failure earlier. --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 4a9b69ebbf6e6..b67d8aada2265 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -439,10 +439,6 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setLong(pos, rs.getLong(pos + 1)) - case ByteType => - (rs: ResultSet, row: InternalRow, pos: Int) => - row.setByte(pos, rs.getByte(pos + 1)) - case ShortType => (rs: ResultSet, row: InternalRow, pos: Int) => row.setShort(pos, rs.getShort(pos + 1)) From f2700eb517246a72873090dffd35635e93283015 Mon Sep 17 00:00:00 2001 From: shivsood Date: Tue, 9 Jul 2019 10:53:21 -0700 Subject: [PATCH 6/6] fixing styling issues --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 1 - .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 10 +++++----- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 5 +---- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b67d8aada2265..5c4a38641f624 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -797,7 +797,6 @@ object JdbcUtils extends Logging { tableSchema: StructType, customSchema: String, nameEquality: Resolver): StructType = { - if (null != customSchema && customSchema.nonEmpty) { val userSchema = CatalystSqlParser.parseTableSchema(customSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index e22f48fbcb095..1288976b0707d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -30,11 +30,11 @@ private object MsSqlServerDialect extends JdbcDialect { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Option(StringType) } else { - sqlType match { - case java.sql.Types.TINYINT => Some(ByteType) - case java.sql.Types.SMALLINT => Some(ShortType) - case java.sql.Types.REAL => Some(FloatType) - case _ => None + sqlType match { + case java.sql.Types.TINYINT => Some(ByteType) + case java.sql.Types.SMALLINT => Some(ShortType) + case java.sql.Types.REAL => Some(FloatType) + case _ => None } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index cc69161c6f25b..bfc94d57aa505 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -897,22 +897,19 @@ class JDBCSuite extends QueryTest "VARBINARY(MAX)") assert(msSqlServerDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "TINYINT") - assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == "SMALLINT") } - test("MsSqlServerDialect catalyst type mapping") { + test("SPARK-28151 MsSqlServerDialect catalyst type mapping") { val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") val metadata = new MetadataBuilder().putLong("scale", 1) - assert(msSqlServerDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata).get == ByteType) assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1, metadata).get == ShortType) assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1, metadata).get == FloatType) - } test("table exists query by jdbc dialect") {