Skip to content

Commit 28fc043

Browse files
committed
[SPARK-28152][SQL][FOLLOWUP] Add a legacy conf for old MsSqlServerDialect numeric mapping
### What changes were proposed in this pull request? This is a follow-up for #25248 . ### Why are the changes needed? The new behavior cannot access the existing table which is created by old behavior. This PR provides a way to avoid new behavior for the existing users. ### Does this PR introduce any user-facing change? Yes. This will fix the broken behavior on the existing tables. ### How was this patch tested? Pass the Jenkins and manually run JDBC integration test. ``` build/mvn install -DskipTests build/mvn -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 test ``` Closes #27184 from dongjoon-hyun/SPARK-28152-CONF. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8ce7962 commit 28fc043

File tree

4 files changed

+100
-41
lines changed

4 files changed

+100
-41
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.math.BigDecimal
2121
import java.sql.{Connection, Date, Timestamp}
2222
import java.util.Properties
2323

24+
import org.apache.spark.sql.internal.SQLConf
2425
import org.apache.spark.tags.DockerTest
2526

2627
@DockerTest
@@ -112,36 +113,58 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
112113
}
113114

114115
test("Numeric types") {
115-
val df = spark.read.jdbc(jdbcUrl, "numbers", new Properties)
116-
val rows = df.collect()
117-
assert(rows.length == 1)
118-
val row = rows(0)
119-
val types = row.toSeq.map(x => x.getClass.toString)
120-
assert(types.length == 12)
121-
assert(types(0).equals("class java.lang.Boolean"))
122-
assert(types(1).equals("class java.lang.Integer"))
123-
assert(types(2).equals("class java.lang.Short"))
124-
assert(types(3).equals("class java.lang.Integer"))
125-
assert(types(4).equals("class java.lang.Long"))
126-
assert(types(5).equals("class java.lang.Double"))
127-
assert(types(6).equals("class java.lang.Float"))
128-
assert(types(7).equals("class java.lang.Float"))
129-
assert(types(8).equals("class java.math.BigDecimal"))
130-
assert(types(9).equals("class java.math.BigDecimal"))
131-
assert(types(10).equals("class java.math.BigDecimal"))
132-
assert(types(11).equals("class java.math.BigDecimal"))
133-
assert(row.getBoolean(0) == false)
134-
assert(row.getInt(1) == 255)
135-
assert(row.getShort(2) == 32767)
136-
assert(row.getInt(3) == 2147483647)
137-
assert(row.getLong(4) == 9223372036854775807L)
138-
assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision
139-
assert(row.getFloat(6) == 1.23456788103168E14) // float(24) has 7-digits precision
140-
assert(row.getFloat(7) == 1.23456788103168E14) // real = float(24)
141-
assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00")))
142-
assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000")))
143-
assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800")))
144-
assert(row.getAs[BigDecimal](11).equals(new BigDecimal("214748.3647")))
116+
Seq(true, false).foreach { flag =>
117+
withSQLConf(SQLConf.LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED.key -> s"$flag") {
118+
val df = spark.read.jdbc(jdbcUrl, "numbers", new Properties)
119+
val rows = df.collect()
120+
assert(rows.length == 1)
121+
val row = rows(0)
122+
val types = row.toSeq.map(x => x.getClass.toString)
123+
assert(types.length == 12)
124+
assert(types(0).equals("class java.lang.Boolean"))
125+
assert(types(1).equals("class java.lang.Integer"))
126+
if (flag) {
127+
assert(types(2).equals("class java.lang.Integer"))
128+
} else {
129+
assert(types(2).equals("class java.lang.Short"))
130+
}
131+
assert(types(3).equals("class java.lang.Integer"))
132+
assert(types(4).equals("class java.lang.Long"))
133+
assert(types(5).equals("class java.lang.Double"))
134+
if (flag) {
135+
assert(types(6).equals("class java.lang.Double"))
136+
assert(types(7).equals("class java.lang.Double"))
137+
} else {
138+
assert(types(6).equals("class java.lang.Float"))
139+
assert(types(7).equals("class java.lang.Float"))
140+
}
141+
assert(types(8).equals("class java.math.BigDecimal"))
142+
assert(types(9).equals("class java.math.BigDecimal"))
143+
assert(types(10).equals("class java.math.BigDecimal"))
144+
assert(types(11).equals("class java.math.BigDecimal"))
145+
assert(row.getBoolean(0) == false)
146+
assert(row.getInt(1) == 255)
147+
if (flag) {
148+
assert(row.getInt(2) == 32767)
149+
} else {
150+
assert(row.getShort(2) == 32767)
151+
}
152+
assert(row.getInt(3) == 2147483647)
153+
assert(row.getLong(4) == 9223372036854775807L)
154+
assert(row.getDouble(5) == 1.2345678901234512E14) // float(53) has 15-digits precision
155+
if (flag) {
156+
assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision
157+
assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24)
158+
} else {
159+
assert(row.getFloat(6) == 1.23456788103168E14) // float(24) has 7-digits precision
160+
assert(row.getFloat(7) == 1.23456788103168E14) // real = float(24)
161+
}
162+
assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00")))
163+
assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000")))
164+
assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800")))
165+
assert(row.getAs[BigDecimal](11).equals(new BigDecimal("214748.3647")))
166+
}
167+
}
145168
}
146169

147170
test("Date types") {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2161,6 +2161,13 @@ object SQLConf {
21612161
.booleanConf
21622162
.createWithDefault(true)
21632163

2164+
val LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED =
2165+
buildConf("spark.sql.legacy.mssqlserver.numericMapping.enabled")
2166+
.internal()
2167+
.doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.")
2168+
.booleanConf
2169+
.createWithDefault(false)
2170+
21642171
/**
21652172
* Holds information about keys that have been deprecated.
21662173
*
@@ -2417,6 +2424,9 @@ class SQLConf extends Serializable with Logging {
24172424

24182425
def addDirectoryRecursiveEnabled: Boolean = getConf(LEGACY_ADD_DIRECTORY_USING_RECURSIVE)
24192426

2427+
def legacyMsSqlServerNumericMappingEnabled: Boolean =
2428+
getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
2429+
24202430
/**
24212431
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
24222432
* identifiers are equal.

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
1919

2020
import java.util.Locale
2121

22+
import org.apache.spark.sql.internal.SQLConf
2223
import org.apache.spark.sql.types._
2324

2425

@@ -33,10 +34,14 @@ private object MsSqlServerDialect extends JdbcDialect {
3334
// String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
3435
Option(StringType)
3536
} else {
36-
sqlType match {
37-
case java.sql.Types.SMALLINT => Some(ShortType)
38-
case java.sql.Types.REAL => Some(FloatType)
39-
case _ => None
37+
if (SQLConf.get.legacyMsSqlServerNumericMappingEnabled) {
38+
None
39+
} else {
40+
sqlType match {
41+
case java.sql.Types.SMALLINT => Some(ShortType)
42+
case java.sql.Types.REAL => Some(FloatType)
43+
case _ => None
44+
}
4045
}
4146
}
4247
}
@@ -46,7 +51,8 @@ private object MsSqlServerDialect extends JdbcDialect {
4651
case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
4752
case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
4853
case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY))
49-
case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
54+
case ShortType if !SQLConf.get.legacyMsSqlServerNumericMappingEnabled =>
55+
Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
5056
case _ => None
5157
}
5258

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -884,17 +884,37 @@ class JDBCSuite extends QueryTest
884884
"BIT")
885885
assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get ==
886886
"VARBINARY(MAX)")
887-
assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get ==
888-
"SMALLINT")
887+
Seq(true, false).foreach { flag =>
888+
withSQLConf(SQLConf.LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED.key -> s"$flag") {
889+
if (SQLConf.get.legacyMsSqlServerNumericMappingEnabled) {
890+
assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).isEmpty)
891+
} else {
892+
assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get ==
893+
"SMALLINT")
894+
}
895+
}
896+
}
889897
}
890898

891899
test("SPARK-28152 MsSqlServerDialect catalyst type mapping") {
892900
val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver")
893901
val metadata = new MetadataBuilder().putLong("scale", 1)
894-
assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1,
895-
metadata).get == ShortType)
896-
assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1,
897-
metadata).get == FloatType)
902+
903+
Seq(true, false).foreach { flag =>
904+
withSQLConf(SQLConf.LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED.key -> s"$flag") {
905+
if (SQLConf.get.legacyMsSqlServerNumericMappingEnabled) {
906+
assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1,
907+
metadata).isEmpty)
908+
assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1,
909+
metadata).isEmpty)
910+
} else {
911+
assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1,
912+
metadata).get == ShortType)
913+
assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1,
914+
metadata).get == FloatType)
915+
}
916+
}
917+
}
898918
}
899919

900920
test("table exists query by jdbc dialect") {

0 commit comments

Comments
 (0)