From c7abad3bfdbf654bcd35425ce5053e4a203f3cdf Mon Sep 17 00:00:00 2001 From: ramulla Date: Sat, 22 Aug 2015 14:03:32 -0400 Subject: [PATCH 01/15] Added stringDataType option to jdbc connection properties --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ce8744b53175..4f27ae49036d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -270,7 +270,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // Create the table if the table didn't exist. if (!tableExists) { - val schema = JdbcUtils.schemaString(df, url) + val schema = JdbcUtils.schemaString(df, url, connectionProperties) val sql = s"CREATE TABLE $table ($schema)" conn.prepareStatement(sql).executeUpdate() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 26788b2a4fd6..9fd20c992e23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -161,9 +161,13 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(df: DataFrame, url: String): String = { + def schemaString( + df: DataFrame, + url: String, + properties: Properties = new Properties()): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) + val stringDataType = properties.getProperty("dbStringDataType", "TEXT") df.schema.fields foreach { field => { val name = field.name val typ: String = @@ -176,7 +180,7 @@ object JdbcUtils extends Logging { case ShortType => "INTEGER" case ByteType => "BYTE" case BooleanType => "BIT(1)" - case StringType => "TEXT" + case StringType => stringDataType case BinaryType => "BLOB" case TimestampType => "TIMESTAMP" case DateType => "DATE" From f1d0b9ea6a8d7c0f0dd2def590bdc1d5c85ec75e Mon Sep 17 00:00:00 2001 From: ramulla Date: Thu, 27 Aug 2015 01:03:07 -0400 Subject: [PATCH 02/15] Added maxlength for field metadata so string types can use for VARCHAR. Changed getJDBCType to take 2 parameters DataType and MetaData Usage in Scala import org.apache.spark.sql.types.MetadataBuilder val metadata = new MetadataBuilder().putLong("maxlength", 10).build() df.withColumn("colName", col("colName").as("colName", metadata) --- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../datasources/jdbc/JdbcUtils.scala | 10 ++- .../apache/spark/sql/jdbc/JdbcDialects.scala | 72 +++++++++++++++++-- 3 files changed, 73 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 4f27ae49036d..ce8744b53175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -270,7 +270,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // Create the table if the table didn't exist. if (!tableExists) { - val schema = JdbcUtils.schemaString(df, url, connectionProperties) + val schema = JdbcUtils.schemaString(df, url) val sql = s"CREATE TABLE $table ($schema)" conn.prepareStatement(sql).executeUpdate() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 9fd20c992e23..05c97a76035c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -163,15 +163,13 @@ object JdbcUtils extends Logging { */ def schemaString( df: DataFrame, - url: String, - properties: Properties = new Properties()): String = { + url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) - val stringDataType = properties.getProperty("dbStringDataType", "TEXT") df.schema.fields foreach { field => { val name = field.name val typ: String = - dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse( + dialect.getJDBCType(field.dataType, field.metadata).map(_.databaseTypeDefinition).getOrElse( field.dataType match { case IntegerType => "INTEGER" case LongType => "BIGINT" @@ -180,7 +178,7 @@ object JdbcUtils extends Logging { case ShortType => "INTEGER" case ByteType => "BYTE" case BooleanType => "BIT(1)" - case StringType => stringDataType + case StringType => "TEXT" case BinaryType => "BLOB" case TimestampType => "TIMESTAMP" case DateType => "DATE" @@ -203,7 +201,7 @@ object JdbcUtils extends Logging { properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => - dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse( + dialect.getJDBCType(field.dataType, field.metadata).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType => java.sql.Types.INTEGER case LongType => java.sql.Types.BIGINT diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 8849fc2f1f0e..aec2e0ac0ca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -77,9 +77,10 @@ abstract class JdbcDialect { /** * Retrieve the jdbc / sql type for a given datatype. * @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]]) + * @param md The metadata * @return The new JdbcType if there is an override for this DataType */ - def getJDBCType(dt: DataType): Option[JdbcType] = None + def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = None /** * Quotes the identifier. This is used to put quotes around the identifier in case the column @@ -159,8 +160,8 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption } - override def getJDBCType(dt: DataType): Option[JdbcType] = { - dialects.flatMap(_.getJDBCType(dt)).headOption + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { + dialects.flatMap(_.getJDBCType(dt, md)).headOption } } @@ -191,7 +192,7 @@ case object PostgresDialect extends JdbcDialect { } else None } - override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = dt match { case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR)) case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY)) case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) @@ -222,3 +223,66 @@ case object MySQLDialect extends JdbcDialect { s"`$colName`" } } + +/** + * :: DeveloperApi :: + * Default DB2 dialect, mapping string/boolean on write to valid DB2 types. + * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1). + */ +@DeveloperApi +case object DB2Dialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2") + + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { + if (dt == StringType && md.contains("maxlength")) { + Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) + } else if (dt == StringType ) { + Some(JdbcType("CLOB", java.sql.Types.CLOB)) + } else if (dt == BooleanType ) { + Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) + } else None + } +} + +/** + * :: DeveloperApi :: + * Default Oracle dialect, mapping string/boolean on write to valid Oracle types. + */ +@DeveloperApi +case object OracleDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") + + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { + if (dt == StringType && md.contains("maxlength")) { + Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) + } else if (dt == StringType ) { + Some(JdbcType("CLOB", java.sql.Types.CLOB)) + } else if (dt == BooleanType ) { + Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) + } else None + } +} + +/** + * :: DeveloperApi :: + * Default Netezza dialect, mapping string/boolean on write to valid Netezza types. + */ +@DeveloperApi +case object NetezzaDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza") + + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { + if (dt == StringType && md.contains("maxlength")) { + Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) + } else if (dt == StringType ) { + Some(JdbcType("VARCHAR(255)", java.sql.Types.CHAR)) + } else if (dt == BinaryType ) { + Some(JdbcType("BYTEINT", java.sql.Types.BINARY)) + } else if (dt == BooleanType ) { + Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) + } else None + } +} \ No newline at end of file From faff50776011aed42559c1ffe61923ebc5e57999 Mon Sep 17 00:00:00 2001 From: ramulla Date: Thu, 27 Aug 2015 08:39:56 -0400 Subject: [PATCH 03/15] Added maxlength for field metadata so string types can use for VARCHAR. Changed getJDBCType to take 2 parameters DataType and MetaData Usage in Scala import org.apache.spark.sql.types.MetadataBuilder val metadata = new MetadataBuilder().putLong("maxlength", 10).build() df.withColumn("colName", col("colName").as("colName", metadata) --- .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index aec2e0ac0ca3..143f1abaabcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -126,6 +126,9 @@ object JdbcDialects { registerDialect(MySQLDialect) registerDialect(PostgresDialect) + registerDialect(OracleDialect) + registerDialect(DB2Dialect) + registerDialect(NetezzaDialect) /** * Fetch the JdbcDialect class corresponding to a given database url. From c4b44774b18d26d5b04f72d4ce8edd1216d9c970 Mon Sep 17 00:00:00 2001 From: ramulla Date: Thu, 27 Aug 2015 10:56:02 -0400 Subject: [PATCH 04/15] Added maxlength for field metadata so string types can use for VARCHAR. Changed getJDBCType to take 2 parameters DataType and MetaData Usage in Scala import org.apache.spark.sql.types.MetadataBuilder val metadata = new MetadataBuilder().putLong("maxlength", 10).build() df.withColumn("colName", col("colName").as("colName", metadata) --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edac0848c3b..d6c17c3e5dc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -407,6 +407,9 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) + assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) + assert(JdbcDialects.get("jdbc:oracle://127.0.0.1/db") == OracleDialect) + assert(JdbcDialects.get("jdbc:netezza://127.0.0.1/db") == NetezzaDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } From 35e61f3d3d254135ae3df454c0ee67cec9d4986f Mon Sep 17 00:00:00 2001 From: ramulla Date: Mon, 14 Sep 2015 16:38:23 -0400 Subject: [PATCH 05/15] Added method override for getJDBCType to take 2 parameters DataType and MetaData and removed DB2 JdbcDialect as DB2 team is working on the dialect. --- .../datasources/jdbc/JdbcUtils.scala | 19 ++++++++--- .../apache/spark/sql/jdbc/JdbcDialects.scala | 33 +++++-------------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 1 - 3 files changed, 23 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 05c97a76035c..2e5685ad3352 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -161,15 +161,18 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString( - df: DataFrame, - url: String): String = { + def schemaString(df: DataFrame, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { val name = field.name - val typ: String = - dialect.getJDBCType(field.dataType, field.metadata).map(_.databaseTypeDefinition).getOrElse( + /* If field metadata exists then call getJDBCType with field data type and metadata else just call with field data type + * To override field metadata in Scala + * import org.apache.spark.sql.types.MetadataBuilder + * val metadata = new MetadataBuilder().putLong("maxlength", 10).build() + * df.withColumn("colName", col("colName").as("colName", metadata) + */ + val typ: String = dialect.getJDBCType(field.dataType, field.metadata).map(_.databaseTypeDefinition).getOrElse( field.dataType match { case IntegerType => "INTEGER" case LongType => "BIGINT" @@ -201,6 +204,12 @@ object JdbcUtils extends Logging { properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => + /* If field metadata exists then call getJDBCType with field data type and metadata else just call with field data type + * To override field metadata in Scala + * import org.apache.spark.sql.types.MetadataBuilder + * val metadata = new MetadataBuilder().putLong("maxlength", 10).build() + * df.withColumn("colName", col("colName").as("colName", metadata) + */ dialect.getJDBCType(field.dataType, field.metadata).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType => java.sql.Types.INTEGER diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 143f1abaabcc..928404c0df9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -74,6 +74,13 @@ abstract class JdbcDialect { def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + /** + * Retrieve the jdbc / sql type for a given datatype. + * @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]]) + * @return The new JdbcType if there is an override for this DataType + */ + def getJDBCType(dt: DataType): Option[JdbcType] = None + /** * Retrieve the jdbc / sql type for a given datatype. * @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]]) @@ -127,7 +134,6 @@ object JdbcDialects { registerDialect(MySQLDialect) registerDialect(PostgresDialect) registerDialect(OracleDialect) - registerDialect(DB2Dialect) registerDialect(NetezzaDialect) /** @@ -163,8 +169,8 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption } - override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { - dialects.flatMap(_.getJDBCType(dt, md)).headOption + override def getJDBCType(dt: DataType): Option[JdbcType] = { + dialects.flatMap(_.getJDBCType(dt)).headOption } } @@ -227,27 +233,6 @@ case object MySQLDialect extends JdbcDialect { } } -/** - * :: DeveloperApi :: - * Default DB2 dialect, mapping string/boolean on write to valid DB2 types. - * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1). - */ -@DeveloperApi -case object DB2Dialect extends JdbcDialect { - - override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2") - - override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { - if (dt == StringType && md.contains("maxlength")) { - Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) - } else if (dt == StringType ) { - Some(JdbcType("CLOB", java.sql.Types.CLOB)) - } else if (dt == BooleanType ) { - Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) - } else None - } -} - /** * :: DeveloperApi :: * Default Oracle dialect, mapping string/boolean on write to valid Oracle types. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d6c17c3e5dc4..430270f06059 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -407,7 +407,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) - assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("jdbc:oracle://127.0.0.1/db") == OracleDialect) assert(JdbcDialects.get("jdbc:netezza://127.0.0.1/db") == NetezzaDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) From 0cfeefa5edead4e0ee988eb25d5b21825b25f6d8 Mon Sep 17 00:00:00 2001 From: ramulla Date: Sat, 26 Sep 2015 09:16:01 -0400 Subject: [PATCH 06/15] Sync Master --- .../datasources/jdbc/JdbcUtils.scala | 9 ++---- .../apache/spark/sql/jdbc/JdbcDialects.scala | 28 ++++++++++++------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5367e9838075..2e5685ad3352 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -42,13 +42,10 @@ object JdbcUtils extends Logging { /** * Returns true if the table already exists in the JDBC database. */ - def tableExists(conn: Connection, url: String, table: String): Boolean = { - val dialect = JdbcDialects.get(url) - + def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all - // SQL database systems using JDBC meta data calls, considering "table" could also include - // the database name. Query used to find table exists can be overriden by the dialects. - Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess + // SQL database systems, considering "table" could also include the database name. + Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 4220bccfb976..e70957e1a444 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -133,12 +133,10 @@ object JdbcDialects { registerDialect(MySQLDialect) registerDialect(PostgresDialect) -<<<<<<< HEAD + registerDialect(DB2Dialect) + registerDialect(MsSqlServerDialect) registerDialect(OracleDialect) registerDialect(NetezzaDialect) -======= - registerDialect(DB2Dialect) ->>>>>>> upstream/master /** * Fetch the JdbcDialect class corresponding to a given database url. @@ -244,9 +242,7 @@ case object MySQLDialect extends JdbcDialect { */ @DeveloperApi case object DB2Dialect extends JdbcDialect { - override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2") - override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = dt match { case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) @@ -254,15 +250,29 @@ case object DB2Dialect extends JdbcDialect { } } +/** + * :: DeveloperApi :: + * Default Microsoft SQL Server dialect, mapping the datetimeoffset types to a String on read. + */ +@DeveloperApi +case object MsSqlServerDialect extends JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + if (typeName.contains("datetimeoffset")) { + // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients + Some(StringType) + } else None + } +} + /** * :: DeveloperApi :: * Default Oracle dialect, mapping string/boolean on write to valid Oracle types. */ @DeveloperApi case object OracleDialect extends JdbcDialect { - override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") - override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { if (dt == StringType && md.contains("maxlength")) { Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) @@ -280,9 +290,7 @@ case object OracleDialect extends JdbcDialect { */ @DeveloperApi case object NetezzaDialect extends JdbcDialect { - override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza") - override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { if (dt == StringType && md.contains("maxlength")) { Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) From dddc13762fc97a2dd8e407729db19cfee4cf6ccb Mon Sep 17 00:00:00 2001 From: ramulla Date: Sat, 26 Sep 2015 09:24:42 -0400 Subject: [PATCH 07/15] Updated a call with added parameter --- .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index e70957e1a444..1ae8c5b31817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -171,8 +171,8 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption } - override def getJDBCType(dt: DataType): Option[JdbcType] = { - dialects.flatMap(_.getJDBCType(dt)).headOption + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { + dialects.flatMap(_.getJDBCType(dt, md)).headOption } } From 5f532e8c0de5fab2f354d15be414d642ffc35aef Mon Sep 17 00:00:00 2001 From: ramulla Date: Mon, 28 Sep 2015 10:01:45 -0400 Subject: [PATCH 08/15] Fixed spark code style where comment exceeded 100 chars, and changed java.sql.Types.CHAR to java.sql.Types.VARCHAR when type used is VARCHAR --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 7 ++++--- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 2e5685ad3352..1830eb433119 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -166,13 +166,14 @@ object JdbcUtils extends Logging { val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { val name = field.name - /* If field metadata exists then call getJDBCType with field data type and metadata else just call with field data type + /* Modified getJDBCType with added parameter metadata * To override field metadata in Scala * import org.apache.spark.sql.types.MetadataBuilder * val metadata = new MetadataBuilder().putLong("maxlength", 10).build() * df.withColumn("colName", col("colName").as("colName", metadata) */ - val typ: String = dialect.getJDBCType(field.dataType, field.metadata).map(_.databaseTypeDefinition).getOrElse( + val typ: String = dialect.getJDBCType(field.dataType, field.metadata) + .map(_.databaseTypeDefinition).getOrElse( field.dataType match { case IntegerType => "INTEGER" case LongType => "BIGINT" @@ -204,7 +205,7 @@ object JdbcUtils extends Logging { properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => - /* If field metadata exists then call getJDBCType with field data type and metadata else just call with field data type + /* Modified getJDBCType with added parameter metadata * To override field metadata in Scala * import org.apache.spark.sql.types.MetadataBuilder * val metadata = new MetadataBuilder().putLong("maxlength", 10).build() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 1ae8c5b31817..491a04bc8cda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -258,7 +258,7 @@ case object DB2Dialect extends JdbcDialect { case object MsSqlServerDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver") override def getCatalystType( - sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (typeName.contains("datetimeoffset")) { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Some(StringType) @@ -275,7 +275,7 @@ case object OracleDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { if (dt == StringType && md.contains("maxlength")) { - Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) + Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.VARCHAR)) } else if (dt == StringType ) { Some(JdbcType("CLOB", java.sql.Types.CLOB)) } else if (dt == BooleanType ) { @@ -293,9 +293,9 @@ case object NetezzaDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza") override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { if (dt == StringType && md.contains("maxlength")) { - Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.CHAR)) + Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.VARCHAR)) } else if (dt == StringType ) { - Some(JdbcType("VARCHAR(255)", java.sql.Types.CHAR)) + Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR)) } else if (dt == BinaryType ) { Some(JdbcType("BYTEINT", java.sql.Types.BINARY)) } else if (dt == BooleanType ) { From 27f118be2b6c4bf1ce34b40d79691f17856e0bba Mon Sep 17 00:00:00 2001 From: ramulla Date: Mon, 28 Sep 2015 14:25:41 -0400 Subject: [PATCH 09/15] Fixed sync with master issue --- .../datasources/jdbc/JdbcUtils.scala | 9 ++++++--- .../apache/spark/sql/jdbc/JdbcDialects.scala | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 1830eb433119..6140d635e696 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -42,10 +42,13 @@ object JdbcUtils extends Logging { /** * Returns true if the table already exists in the JDBC database. */ - def tableExists(conn: Connection, table: String): Boolean = { + def tableExists(conn: Connection, url: String, table: String): Boolean = { + val dialect = JdbcDialects.get(url) + // Somewhat hacky, but there isn't a good way to identify whether a table exists for all - // SQL database systems, considering "table" could also include the database name. - Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess + // SQL database systems using JDBC meta data calls, considering "table" could also include + // the database name. Query used to find table exists can be overriden by the dialects. + Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 491a04bc8cda..972c3818bca9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -96,6 +96,17 @@ abstract class JdbcDialect { def quoteIdentifier(colName: String): String = { s""""$colName"""" } + + /** + * Get the SQL query that should be used to find if the given table exists. Dialects can + * override this method to return a query that works best in a particular database. + * @param table The name of the table. + * @return The SQL query to use for checking the table. + */ + def getTableExistsQuery(table: String): String = { + s"SELECT * FROM $table WHERE 1=0" + } + } /** @@ -209,6 +220,11 @@ case object PostgresDialect extends JdbcDialect { case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) case _ => None } + + override def getTableExistsQuery(table: String): String = { + s"SELECT 1 FROM $table LIMIT 1" + } + } /** @@ -233,6 +249,10 @@ case object MySQLDialect extends JdbcDialect { override def quoteIdentifier(colName: String): String = { s"`$colName`" } + + override def getTableExistsQuery(table: String): String = { + s"SELECT 1 FROM $table LIMIT 1" + } } /** From 44e1978530a9703ecad9fae90a7ea9ebb0693fd3 Mon Sep 17 00:00:00 2001 From: ramulla Date: Mon, 28 Sep 2015 16:31:04 -0400 Subject: [PATCH 10/15] Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index c72154e8fae5..7bd7a693e053 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{MetadataCleanerType, Utils} class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { import testImplicits._ @@ -450,8 +450,8 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("DB2Dialect type mapping") { val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db") - assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") - assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") + assert(db2Dialect.getJDBCType(StringType,null).map(_.databaseTypeDefinition).get == "CLOB") + assert(db2Dialect.getJDBCType(BooleanType,null).map(_.databaseTypeDefinition).get == "CHAR(1)") } test("table exists query by jdbc dialect") { From e605a11383f2a0c22003e3a3cdf349b26af9310f Mon Sep 17 00:00:00 2001 From: ramulla Date: Mon, 28 Sep 2015 17:00:35 -0400 Subject: [PATCH 11/15] Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7bd7a693e053..d71989b8cc11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -450,8 +450,8 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("DB2Dialect type mapping") { val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db") - assert(db2Dialect.getJDBCType(StringType,null).map(_.databaseTypeDefinition).get == "CLOB") - assert(db2Dialect.getJDBCType(BooleanType,null).map(_.databaseTypeDefinition).get == "CHAR(1)") + assert(db2Dialect.getJDBCType(StringType, null).map(_.databaseTypeDefinition).get == "CLOB") + assert(db2Dialect.getJDBCType(BooleanType, null).map(_.databaseTypeDefinition).get == "CHAR(1)") } test("table exists query by jdbc dialect") { From 4cae11b40df30b90689633e8d9085b87b528f05b Mon Sep 17 00:00:00 2001 From: ramulla Date: Tue, 29 Sep 2015 03:05:11 -0400 Subject: [PATCH 12/15] Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d71989b8cc11..1bf303e75dbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -407,10 +407,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) - assert(JdbcDialects.get("jdbc:oracle://127.0.0.1/db") == OracleDialect) - assert(JdbcDialects.get("jdbc:netezza://127.0.0.1/db") == NetezzaDialect) assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect) + assert(JdbcDialects.get("jdbc:oracle://127.0.0.1/db") == OracleDialect) + assert(JdbcDialects.get("jdbc:netezza://127.0.0.1/db") == NetezzaDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } From 4c2a7a43754485ad5771250ebaf8991a0137df0b Mon Sep 17 00:00:00 2001 From: ramulla Date: Tue, 29 Sep 2015 03:37:35 -0400 Subject: [PATCH 13/15] Updated JDBCDialects to save VARCHAR size to metadata --- .../apache/spark/sql/jdbc/JdbcDialects.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 972c3818bca9..ca7ff4316256 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -293,6 +293,15 @@ case object MsSqlServerDialect extends JdbcDialect { @DeveloperApi case object OracleDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + if (sqlType == Types.VARCHAR && typeName.equals("VARCHAR")) { + //Save varchar size to metadata + md.putLong("maxlength", size) + Some(LongType) + } else None + } + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { if (dt == StringType && md.contains("maxlength")) { Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.VARCHAR)) @@ -311,6 +320,15 @@ case object OracleDialect extends JdbcDialect { @DeveloperApi case object NetezzaDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + if (sqlType == Types.VARCHAR && typeName.equals("VARCHAR")) { + //Save varchar size to metadata + md.putLong("maxlength", size) + Some(LongType) + } else None + } + override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = { if (dt == StringType && md.contains("maxlength")) { Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.VARCHAR)) From a0cb024b74c602e531f51020cb1a1ee97f8238ff Mon Sep 17 00:00:00 2001 From: ramulla Date: Tue, 29 Sep 2015 14:37:24 -0400 Subject: [PATCH 14/15] Updated JDBCDialects to save VARCHAR size to metadata --- .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index ca7ff4316256..2a822b997471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -296,7 +296,7 @@ case object OracleDialect extends JdbcDialect { override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.VARCHAR && typeName.equals("VARCHAR")) { - //Save varchar size to metadata + // Save varchar size to metadata md.putLong("maxlength", size) Some(LongType) } else None @@ -323,7 +323,7 @@ case object NetezzaDialect extends JdbcDialect { override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.VARCHAR && typeName.equals("VARCHAR")) { - //Save varchar size to metadata + // Save varchar size to metadata md.putLong("maxlength", size) Some(LongType) } else None From d50bdf796c96f827aa920c31dd3d5822414f178d Mon Sep 17 00:00:00 2001 From: ramulla Date: Sun, 4 Oct 2015 11:18:57 -0400 Subject: [PATCH 15/15] Changed call for getJDBCType to call both methods with datatype and metadata and one with only datatype. --- .../datasources/jdbc/JdbcUtils.scala | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 6140d635e696..f2b30f025222 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -169,14 +169,10 @@ object JdbcUtils extends Logging { val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { val name = field.name - /* Modified getJDBCType with added parameter metadata - * To override field metadata in Scala - * import org.apache.spark.sql.types.MetadataBuilder - * val metadata = new MetadataBuilder().putLong("maxlength", 10).build() - * df.withColumn("colName", col("colName").as("colName", metadata) - */ + // Added getJDBCType with added parameter metadata val typ: String = dialect.getJDBCType(field.dataType, field.metadata) - .map(_.databaseTypeDefinition).getOrElse( + .map(_.databaseTypeDefinition).orElse(dialect.getJDBCType(field.dataType) + .map(_.databaseTypeDefinition)).getOrElse( field.dataType match { case IntegerType => "INTEGER" case LongType => "BIGINT" @@ -208,29 +204,27 @@ object JdbcUtils extends Logging { properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => - /* Modified getJDBCType with added parameter metadata - * To override field metadata in Scala - * import org.apache.spark.sql.types.MetadataBuilder - * val metadata = new MetadataBuilder().putLong("maxlength", 10).build() - * df.withColumn("colName", col("colName").as("colName", metadata) - */ - dialect.getJDBCType(field.dataType, field.metadata).map(_.jdbcNullType).getOrElse( - field.dataType match { - case IntegerType => java.sql.Types.INTEGER - case LongType => java.sql.Types.BIGINT - case DoubleType => java.sql.Types.DOUBLE - case FloatType => java.sql.Types.REAL - case ShortType => java.sql.Types.INTEGER - case ByteType => java.sql.Types.INTEGER - case BooleanType => java.sql.Types.BIT - case StringType => java.sql.Types.CLOB - case BinaryType => java.sql.Types.BLOB - case TimestampType => java.sql.Types.TIMESTAMP - case DateType => java.sql.Types.DATE - case t: DecimalType => java.sql.Types.DECIMAL - case _ => throw new IllegalArgumentException( + // Added getJDBCType with added parameter metadata + dialect.getJDBCType(field.dataType, field.metadata) + .map(_.jdbcNullType).orElse(dialect.getJDBCType(field.dataType) + .map(_.jdbcNullType)).getOrElse( + field.dataType match { + case IntegerType => java.sql.Types.INTEGER + case LongType => java.sql.Types.BIGINT + case DoubleType => java.sql.Types.DOUBLE + case FloatType => java.sql.Types.REAL + case ShortType => java.sql.Types.INTEGER + case ByteType => java.sql.Types.INTEGER + case BooleanType => java.sql.Types.BIT + case StringType => java.sql.Types.CLOB + case BinaryType => java.sql.Types.BLOB + case TimestampType => java.sql.Types.TIMESTAMP + case DateType => java.sql.Types.DATE + case t: DecimalType => java.sql.Types.DECIMAL + case _ => throw new IllegalArgumentException( s"Can't translate null value for field $field") - }) + } + ) } val rddSchema = df.schema