diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index c68e4dc4933b1..8579338d7e4e0 100644 --- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -59,6 +59,9 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { ).executeUpdate() conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " + "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate() + + conn.prepareStatement("CREATE TABLE keywords (`from` INT, `interval` FLOAT)").executeUpdate() + conn.prepareStatement("INSERT INTO keywords VALUES (23, 25.5)").executeUpdate() } test("Basic test") { @@ -146,8 +149,10 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties) val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties) val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties) + val df4 = sqlContext.read.jdbc(jdbcUrl, "keywords", new Properties) df1.write.jdbc(jdbcUrl, "numberscopy", new Properties) df2.write.jdbc(jdbcUrl, "datescopy", new Properties) df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) + df4.write.jdbc(jdbcUrl, "keywordscopy", new Properties) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 69ba84646f088..31aea49f113b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -96,8 +96,12 @@ object JdbcUtils extends Logging { /** * Returns a PreparedStatement that inserts a row into table via conn. */ - def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { - val columns = rddSchema.fields.map(_.name).mkString(",") + def insertStatement( + conn: Connection, + table: String, + rddSchema: StructType, + dialect: JdbcDialect): PreparedStatement = { + val columns = rddSchema.fields.map(field => dialect.quoteIdentifier(field.name)).mkString(",") val placeholders = rddSchema.fields.map(_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" conn.prepareStatement(sql) @@ -169,7 +173,7 @@ object JdbcUtils extends Logging { if (supportsTransactions) { conn.setAutoCommit(false) // Everything in the same db transaction. } - val stmt = insertStatement(conn, table, rddSchema) + val stmt = insertStatement(conn, table, rddSchema, dialect) try { var rowCount = 0 while (iterator.hasNext) { @@ -249,7 +253,7 @@ object JdbcUtils extends Logging { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { - val name = field.name + val name = dialect.quoteIdentifier(field.name) val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable")