Skip to content

Commit 02eeaff

Browse files
author
YIHAODIAN\wangshuangshuang
committed
Spark-19726: alwaysNullable as a parameter for getSchema
1 parent dde9123 commit 02eeaff

File tree

3 files changed

+18
-19
lines changed

3 files changed

+18
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ object JDBCRDD extends Logging {
6161
try {
6262
val rs = statement.executeQuery()
6363
try {
64-
JdbcUtils.getSchema(rs, dialect)
64+
JdbcUtils.getSchema(rs, dialect, true)
6565
} finally {
6666
rs.close()
6767
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -264,19 +264,19 @@ object JdbcUtils extends Logging {
264264
}
265265

266266
/**
267-
* Takes a [[ResultSet]] and returns its Catalyst schema.
268-
*
269-
* @return A [[StructType]] giving the Catalyst schema.
270-
* @throws SQLException if the schema contains an unsupported type.
271-
*/
272-
def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
267+
* Takes a [[ResultSet]] and returns its Catalyst schema.
268+
*
269+
* @param alwaysNullable - If true, spark will propagate null
270+
* to underlying DB engine instead of using type default value
271+
* @return A [[StructType]] giving the Catalyst schema.
272+
* @throws SQLException if the schema contains an unsupported type.
273+
*/
274+
def getSchema(resultSet: ResultSet,
275+
dialect: JdbcDialect,
276+
alwaysNullable: Boolean = false): StructType = {
273277
val rsmd = resultSet.getMetaData
274278
val ncols = rsmd.getColumnCount
275279
val fields = new Array[StructField](ncols)
276-
// if true, spark will propagate null to underlying
277-
// DB engine instead of using type default value
278-
val alwaysNullable = true
279-
280280
var i = 0
281281
while (i < ncols) {
282282
val columnName = rsmd.getColumnLabel(i + 1)
@@ -294,12 +294,10 @@ object JdbcUtils extends Logging {
294294
rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
295295
}
296296
}
297-
val nullable = {
298-
if (alwaysNullable)
299-
alwaysNullable
300-
else
301-
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
302-
}
297+
val nullable = if (alwaysNullable)
298+
alwaysNullable
299+
else
300+
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
303301
val metadata = new MetadataBuilder()
304302
.putString("name", columnName)
305303
.putLong("scale", fieldScale)

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.sql.jdbc
2020
import java.sql.{Date, DriverManager, Timestamp}
2121
import java.util.Properties
2222

23-
import org.apache.spark.SparkException
24-
2523
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
24+
2625
import org.scalatest.BeforeAndAfter
26+
27+
import org.apache.spark.SparkException
2728
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
2829
import org.apache.spark.sql.catalyst.parser.ParseException
2930
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}

0 commit comments

Comments
 (0)