From 88935c5b0eb371476c4f27be26454449fb072e7f Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 7 Apr 2016 20:25:33 -0700 Subject: [PATCH 1/9] fix issue of column name contains space --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 13 +++++++++++-- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 9 ++++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b7ff5f72427a4..8982c413586ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -97,7 +97,7 @@ object JdbcUtils extends Logging { * Returns a PreparedStatement that inserts a row into table via conn. */ def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { - val columns = rddSchema.fields.map(_.name).mkString(",") + val columns = rddSchema.fields.map(f => escapeColumnName(f.name)).mkString(",") val placeholders = rddSchema.fields.map(_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" conn.prepareStatement(sql) @@ -245,6 +245,15 @@ object JdbcUtils extends Logging { Array[Byte]().iterator } + /** + * The utility to add backtick if the column name has space + * @param columnName the input column name + * @return the escaped column name, add backtick if name contains space + */ + private def escapeColumnName(columnName: String): String = { + if (columnName.contains(" ")) s"`$columnName`" else columnName + } + /** * Compute the schema string for this RDD. */ @@ -252,7 +261,7 @@ object JdbcUtils extends Logging { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { - val name = field.name + val name = escapeColumnName(field.name) val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index e23ee6693133b..bbaac8c727523 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -52,7 +52,8 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate() conn1.prepareStatement("drop table if exists test.people1").executeUpdate() conn1.prepareStatement( - "create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate() + "create table test.people1 (name TEXT(32) NOT NULL, `the id` INTEGER NOT NULL)") + .executeUpdate() conn1.commit() sql( @@ -151,4 +152,10 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("SPARK-14460: Insert into table with column containing space") { + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + df.write.insertInto("PEOPLE1") + assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + } } From 51b7f8615aed79f60176ae08430c98c9b35c91bf Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 7 Apr 2016 21:24:51 -0700 Subject: [PATCH 2/9] use dialect to properly handle the quote --- .../apache/spark/sql/DataFrameWriter.scala | 7 +++---- .../datasources/jdbc/JdbcUtils.scala | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 54d250867fbb3..07d35b4ebf3c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -20,9 +20,7 @@ package org.apache.spark.sql import java.util.Properties import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -31,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.util.Utils @@ -496,7 +495,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // connectionProperties should override settings in extraOptions props.putAll(connectionProperties) val conn = JdbcUtils.createConnectionFactory(url, props)() - + val dialect = JdbcDialects.get(url) try { var tableExists = JdbcUtils.tableExists(conn, url, table) @@ -515,7 +514,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // Create the table if the table didn't exist. if (!tableExists) { - val schema = JdbcUtils.schemaString(df, url) + val schema = JdbcUtils.schemaString(dialect, df, url) val sql = s"CREATE TABLE $table ($schema)" val statement = conn.createStatement try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 8982c413586ad..01e10dfafa6ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -96,8 +96,9 @@ object JdbcUtils extends Logging { /** * Returns a PreparedStatement that inserts a row into table via conn. */ - def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { - val columns = rddSchema.fields.map(f => escapeColumnName(f.name)).mkString(",") + def insertStatement(dialect: JdbcDialect, conn: Connection, table: String, rddSchema: StructType) + : PreparedStatement = { + val columns = rddSchema.fields.map(f => quoteColumnName(dialect, f.name)).mkString(",") val placeholders = rddSchema.fields.map(_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" conn.prepareStatement(sql) @@ -169,7 +170,7 @@ object JdbcUtils extends Logging { if (supportsTransactions) { conn.setAutoCommit(false) // Everything in the same db transaction. } - val stmt = insertStatement(conn, table, rddSchema) + val stmt = insertStatement(dialect, conn, table, rddSchema) try { var rowCount = 0 while (iterator.hasNext) { @@ -246,22 +247,24 @@ object JdbcUtils extends Logging { } /** - * The utility to add backtick if the column name has space + * The utility to add quote to the column name based on its dialect + * * @param columnName the input column name - * @return the escaped column name, add backtick if name contains space + * @param dialect the JDBC dialect + * @return the quoted column name */ - private def escapeColumnName(columnName: String): String = { - if (columnName.contains(" ")) s"`$columnName`" else columnName + private def quoteColumnName(dialect: JdbcDialect, columnName: String): String = { + dialect.quoteIdentifier(columnName) } /** * Compute the schema string for this RDD. */ - def schemaString(df: DataFrame, url: String): String = { + def schemaString(dialect: JdbcDialect, df: DataFrame, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { - val name = escapeColumnName(field.name) + val name = quoteColumnName(dialect, field.name) val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") From 9ffb4621904f2bdaad8e1f4d2403ca01ff7dbd98 Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 7 Apr 2016 23:29:33 -0700 Subject: [PATCH 3/9] fix scala style check --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 07d35b4ebf3c1..c10f6b7e1cc77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql import java.util.Properties import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation From d0c86eaa46368e90697ae9cf2e8b3f65adcfef41 Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 7 Apr 2016 23:32:38 -0700 Subject: [PATCH 4/9] adjust param order --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 01e10dfafa6ef..cef67db499c23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -248,9 +248,8 @@ object JdbcUtils extends Logging { /** * The utility to add quote to the column name based on its dialect - * - * @param columnName the input column name * @param dialect the JDBC dialect + * @param columnName the input column name * @return the quoted column name */ private def quoteColumnName(dialect: JdbcDialect, columnName: String): String = { From b70f21f91791f1eeb0dee15cc6f95c477f508828 Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 14 Apr 2016 15:13:53 -0700 Subject: [PATCH 5/9] fix scala style check --- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 775bcaff91c3a..9f98fe184e496 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -262,7 +262,7 @@ object JdbcUtils extends Logging { def schemaString(dialect: JdbcDialect, df: DataFrame, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) - df.schema.fields foreach { field => { + df.schema.fields foreach { field => val name = quoteColumnName(dialect, field.name) val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" From 715777adff9250014e2d42f670765476ae612d74 Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 14 Apr 2016 19:57:54 -0700 Subject: [PATCH 6/9] add test case for order as column name --- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index bbaac8c727523..4147fb415f2a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -54,6 +54,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { conn1.prepareStatement( "create table test.people1 (name TEXT(32) NOT NULL, `the id` INTEGER NOT NULL)") .executeUpdate() + conn1.prepareStatement( + "create table test.orders (`order` TEXT(32) NOT NULL, `order id` INTEGER NOT NULL)") + .executeUpdate() conn1.commit() sql( @@ -69,6 +72,13 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) + + sql( + s""" + |CREATE TEMPORARY TABLE ORDERS + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url1', dbtable 'TEST.ORDERS', user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) } after { @@ -157,5 +167,8 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) df.write.insertInto("PEOPLE1") assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + + df.write.insertInto("ORDERS") + assert(2 === sqlContext.read.jdbc(url1, "TEST.ORDERS", properties).count) } } From d39ae2d33d3ff5e04c363bc7a873e098770de821 Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 14 Apr 2016 20:42:51 -0700 Subject: [PATCH 7/9] fix dialect issue --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c10f6b7e1cc77..03e8795901a81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -497,7 +497,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { // connectionProperties should override settings in extraOptions props.putAll(connectionProperties) val conn = JdbcUtils.createConnectionFactory(url, props)() - val dialect = JdbcDialects.get(url) try { var tableExists = JdbcUtils.tableExists(conn, url, table) @@ -516,7 +515,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // Create the table if the table didn't exist. if (!tableExists) { - val schema = JdbcUtils.schemaString(dialect, df, url) + val schema = JdbcUtils.schemaString(df, url) val sql = s"CREATE TABLE $table ($schema)" val statement = conn.createStatement try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 9f98fe184e496..f745f00f4a881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -259,10 +259,11 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(dialect: JdbcDialect, df: DataFrame, url: String): String = { + def schemaString(df: DataFrame, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => + val name = quoteColumnName(dialect, field.name) val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" From 7ca90388a3fa6a24afeb9c540603cea7577fb05f Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 14 Apr 2016 20:44:05 -0700 Subject: [PATCH 8/9] reverse line removal --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 03e8795901a81..0e8714e89732c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -497,6 +497,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // connectionProperties should override settings in extraOptions props.putAll(connectionProperties) val conn = JdbcUtils.createConnectionFactory(url, props)() + try { var tableExists = JdbcUtils.tableExists(conn, url, table) From 675700fff3f07fe5c4dd3e8caeeda82788c12d74 Mon Sep 17 00:00:00 2001 From: bomeng Date: Thu, 14 Apr 2016 20:44:39 -0700 Subject: [PATCH 9/9] reverse line removal --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 0e8714e89732c..538c22fd8f867 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -497,7 +497,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // connectionProperties should override settings in extraOptions props.putAll(connectionProperties) val conn = JdbcUtils.createConnectionFactory(url, props)() - + try { var tableExists = JdbcUtils.tableExists(conn, url, table)