From 8d499fe908c29f3b84236315a65e9221ae08cb14 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 16:28:16 -0800 Subject: [PATCH 1/9] Introduce insert, update, and upsert commands to the JdbcUtils class --- .../java/org/apache/spark/sql/SaveMode.java | 8 + .../apache/spark/sql/DataFrameWriter.scala | 11 +- .../datasources/jdbc/JdbcUtils.scala | 248 +++++++++++++++++- 3 files changed, 256 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java index 9665c3c46f901..b71e1d291af38 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java +++ b/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java @@ -29,6 +29,14 @@ public enum SaveMode { * @since 1.3.0 */ Append, + /** + * Update mode means that when saving a DataFrame to a data source, if data/table already exists, + * contents of the DataFrame are inserted where there is no conflict on an identifying key. In + * the case where a conflict exists, the additional columns associated with that key are updated. + * + * @since 1.3.0 + */ + Update, /** * Overwrite mode means that when saving a DataFrame to a data source, * if data/table already exists, existing data is expected to be overwritten by the contents of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 12b304623d30b..270da1d2baa30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql import java.util.Properties -import scala.collection.JavaConverters._ - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} + +import scala.collection.JavaConverters._ /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -64,10 +64,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { this.mode = saveMode.toLowerCase match { case "overwrite" => SaveMode.Overwrite case "append" => SaveMode.Append + case "update" => SaveMode.Update case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + - "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") + "Accepted save modes are 'overwrite', 'append', 'update', 'ignore', 'error'.") } this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index d3e1efc562777..97aebf19aca49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -17,17 +17,18 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, Driver, DriverManager, PreparedStatement} +import java.sql.{Connection, Driver, DriverManager, PreparedStatement, Statement} import java.util.Properties -import scala.collection.JavaConverters._ -import scala.util.Try -import scala.util.control.NonFatal - import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal /** * Util functions for JDBC tables. @@ -290,4 +291,239 @@ object JdbcUtils extends Logging { } } + /** + * Check whether a table exists in a given database + * + * @return + */ + @transient + def checkTableExists(targetDb: String, tableName: String): Boolean = { + val dbc: Connection = DriverManager.getConnection(targetDb) + val dbm = dbc.getMetaData() + // Check if the table exists. If it exists, perform an upsert. + // Otherwise, do a simple dataframe write to the DB + val tables = dbm.getTables(null, null, tableName, null) + val exists = tables.next() // Returns false if next does not exist + dbc.close() + exists + } + + // Provide a reasonable starting batch size for database operations. + private val DEFAULT_BATCH_SIZE: Int = 200 + + // Limit the number of database connections. Some DBs suffer when there are many open + // connections. + private val DEFAULT_MAX_CONNECTIONS: Int = 50 + + private val DEFAULT_ID_COLUMN: String = "id" + + /** + * Given an RDD of SQL queries to execute, connect to a database and perform a + * sequence of batched operations. + * Usies a maximum number of simultaneous connections as defined by "maxConnections". + * + * @param targetDb The database to connect to, provided as a jdbc URL, e.g. + * "jdbc:postgresql://192.168.0.1:5432/MY_DB_NAME?user=MY_USER&password=PASSWORD" + * @param statements An rdd of SQL statements to execute (as strings) + * @param batchSize The batch size to use, default 200 + * @param maxConnections Maximum number of simultaneous connections to open to the database + */ + private def executeStatements(targetDb: String, + statements: Dataset[String], + batchSize: Int = DEFAULT_BATCH_SIZE, + maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { + // To avoid overloading database coalesce to a set number of partitions if necessary + val coalesced = if (statements.rdd.getNumPartitions > maxConnections) { + statements.coalesce(maxConnections) + } else { + statements + } + + coalesced.mapPartitions(Iterator(_)).foreach { batch => + val dbc: Connection = DriverManager.getConnection(targetDb) + dbc.setAutoCommit(false) + + val st: Statement = dbc.createStatement() + + try { + batch.grouped(batchSize).foreach { rowBatch => + rowBatch.foreach { statement => + st.addBatch(statement) + } + st.executeBatch() + dbc.commit() + } + } + finally { + dbc.close() + } + } + } + + /** + * Given a row of a database and specified key, generate an insert query + * + * @param row A row of a dataframe + * @param schema The schema for the dataframe + * @param tableName The name of the table to update + * @return A SQL statement that inserts the referenced row into the table. + */ + def genInsertScript(row: Row, schema: StructType, tableName: String): String = { + val schemaString = schema.map(s => s.name).reduce(_ + ", " + _) + + val valString = { + row.toSeq.map(v => "'" + v.toString.replaceAll("'", "''") + "'").reduce(_ + "," + _) + } + + val insert = s"INSERT INTO $tableName ($schemaString) VALUES ($valString);" + insert + } + + /** + * Given a row of a database and specified key, generate an update query + * + * @param row A row of a dataframe + * @param schema The schema for the dataframe + * @param tableName The name of the table to update + * @param keyField The name of the column that can serve as a primary key + * @return A SQL statement that will update the referenced row in the table. + */ + def genUpdateScript(row: Row, schema: StructType, tableName: String, keyField: String): String = { + val keyVal = row.get(schema.fieldIndex(keyField)) + val zipped = row.toSeq.zip(schema.map(s => s.name)) + + val valString = zipped.flatMap(s => { + // Value fields are bounded with single quotes so escape any single quotes in the value + val noQuotes: String = s._1.toString.replaceAll("'", "''") + if (!s._2.equals(keyField)) Seq(s"${s._2}='$noQuotes'") else Seq() + }).reduce(_ + ",\n" + _) + + val update = s"UPDATE $tableName set \n $valString \n WHERE $keyField = $keyVal;" + update + } + + /** + * Write a given DataFrame into a provided database via JDBC by performing an update command. + * If the database contains a row with a matching id, then all other columns will be updated. + * If the database does not contain a row with a matching id an insert is performed instead. + * Note: This command expects that the database contain an indexed column to use for the update, + * default is "id". + * TODO: Add support for arbitrary length keys + * + * @param df The dataframe to write to the database. + * @param targetDb The database to update provided as a jdbc URL, e.g. + * "jdbc:postgresql://192.168.0.1:5432/MY_DB_NAME?user=MY_USER&password=PASSWORD" + * @param tableName The name of the table to update + * @param batchSize The batch size to use, default 200 + * @param maxConnections Maximum number of simultaneous connections to open to the database + * @param idColumn The column to use as the primary key for resolving conflicts, + * default is "id". + */ + @transient + def updateById(df: DataFrame, + targetDb: String, + tableName: String, + batchSize: Int = DEFAULT_BATCH_SIZE, + maxConnections: Int = DEFAULT_MAX_CONNECTIONS, + idColumn: String = DEFAULT_ID_COLUMN): Unit = { + val schema = df.schema + val tableExists = checkTableExists(targetDb, tableName) + + if (!tableExists) { + throw new NotImplementedError("Adding data to a non-existing table is not yet supported.") + } + + val statements = df.map(row => { + genUpdateScript(row, schema, tableName, idColumn) + }) + + executeStatements(targetDb, statements, batchSize, maxConnections) + } + + /** + * Insert a given DataFrame into a provided database via JDBC. + * + * @param df The dataframe to write to the database. + * @param targetDb The database to update provided as a jdbc URL, e.g. + * "jdbc:postgresql://192.168.0.1:5432/MY_DB_NAME?user=MY_USER&password=PASSWORD" + * @param tableName The name of the table to update + * @param batchSize The batch size to use, default 200 + * @param maxConnections Maximum number of simultaneous connections to open to the database + */ + @transient + def insert(df: DataFrame, + targetDb: String, + tableName: String, + batchSize: Int = 200, + maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { + val schema = df.schema + val tableExists = checkTableExists(targetDb, tableName) + + if (!tableExists) { + throw new NotImplementedError("Adding data to a non-existing table is not yet supported.") + } + + val statements = df.map(row => { + genInsertScript(row, schema, tableName) + }) + + executeStatements(targetDb, statements, batchSize, maxConnections) + } + + + /** + * Perform an upsert of a DataFrame to a given table. + * Reads an existing table to determine whether any records need to be updated, performs an update + * for any existing records, otherwise performs an insert. + * + * Because an update is an expensive operation, the most efficient way of performing an update on + * a table is to first identify which records should be updated and which can be inserted. If + * possible, an index on a single column should be used to perform the update, for example, an + * "id" column. Performing an update on a multi-field index is even more computationally + * expensive. + * + * For improved performance, remove any indices from the table being updated (except for the + * index on the id column) and restore them after the update. + * + * @param sqlContext The active SQL Context + * @param df The dataset to write to the database. + * @param targetDb The database to update provided as a jdbc URL, e.g. + * "jdbc:postgresql://192.168.0.1:5432/MY_DB_NAME?user=MY_USER&password=PASSWORD" + * @param properties JDBC connection properties. + * @param tableName The name of the table to update + * @param primaryKeys A set representing the primary key for a database - the combination of + * column names that uniquely identifies a row in the dataframe. + * E.g. Set("first_name", "last_name", "address") + * @param batchSize The batch size to use, default 200 + * @param maxConnections Maximum number of simultaneous connections to open to the database + * @param idColumn The column to use as the key for resolving conflicts, default is "id" + */ + def upsert(sqlContext: SQLContext, + df: DataFrame, + targetDb: String, + properties: Properties, + tableName: String, + primaryKeys: Set[String], + batchSize: Int = DEFAULT_BATCH_SIZE, + maxConnections: Int = DEFAULT_MAX_CONNECTIONS, + idColumn: String = "id"): Unit = { + val storedDb = sqlContext.read.jdbc(targetDb, tableName, properties) + + // Determine rows to upsert based on a key match in the database + val toUpsert = df.join(storedDb, primaryKeys.toSeq, "inner").select(df("*"), storedDb("id")) + + // Insert those rows where there is no matching entry in the database already + // Do a select to ensure that columns are in the same order for except + // Note: we need to also get rid of timestamps for this comparison + val upsertKeys = toUpsert.select(primaryKeys.map(col).toSeq: _*) + val primaryKeysToInsert = df.select(primaryKeys.map(col).toSeq: _*).except(upsertKeys) + val toInsert = primaryKeysToInsert.join(df, primaryKeys.toSeq, "left_outer") + + // Only perform an update if there are overlapping elements + if (toUpsert.count() > 0) { + updateById(toUpsert, targetDb, tableName, batchSize, maxConnections, idColumn) + } + + insert(toInsert, targetDb, tableName, batchSize, maxConnections) + } } From 89cef373077283627cc896dce4ab95c9d5aa41de Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 16:32:49 -0800 Subject: [PATCH 2/9] Extra line --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 97aebf19aca49..a446c722bcdd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -294,7 +294,7 @@ object JdbcUtils extends Logging { /** * Check whether a table exists in a given database * - * @return + * @return True if the table exists. */ @transient def checkTableExists(targetDb: String, tableName: String): Boolean = { @@ -470,7 +470,6 @@ object JdbcUtils extends Logging { executeStatements(targetDb, statements, batchSize, maxConnections) } - /** * Perform an upsert of a DataFrame to a given table. * Reads an existing table to determine whether any records need to be updated, performs an update From a64719b2c0b687cbe0b854d4a0c5e6e02f75a0bc Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 16:39:13 -0800 Subject: [PATCH 3/9] Reverted changes to df writer --- .../org/apache/spark/sql/DataFrameWriter.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 533cde8b697f4..ff1f0177e8ba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -19,15 +19,6 @@ package org.apache.spark.sql import java.util.Properties -<<<<<<< HEAD -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} - -import scala.collection.JavaConverters._ -======= import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability @@ -39,7 +30,6 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType ->>>>>>> upstream/master /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -79,11 +69,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { this.mode = saveMode.toLowerCase match { case "overwrite" => SaveMode.Overwrite case "append" => SaveMode.Append - case "update" => SaveMode.Update case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + - "Accepted save modes are 'overwrite', 'append', 'update', 'ignore', 'error'.") + "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") } this } From ca494ebdf9110b67c96fc1c3df8463a4d63a56da Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 16:39:46 -0800 Subject: [PATCH 4/9] Reverted changes to savemode --- sql/core/src/main/java/org/apache/spark/sql/SaveMode.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java index 0ce012417c254..1c3c9794fb6bb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java +++ b/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java @@ -32,14 +32,6 @@ public enum SaveMode { * @since 1.3.0 */ Append, - /** - * Update mode means that when saving a DataFrame to a data source, if data/table already exists, - * contents of the DataFrame are inserted where there is no conflict on an identifying key. In - * the case where a conflict exists, the additional columns associated with that key are updated. - * - * @since 1.3.0 - */ - Update, /** * Overwrite mode means that when saving a DataFrame to a data source, * if data/table already exists, existing data is expected to be overwritten by the contents of From 6a2cb054c12223460c0c39fbe9335c7b198b4fee Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 16:55:09 -0800 Subject: [PATCH 5/9] Fixed issue using mapPartitions --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 79c7e988df983..f23f1bae5a0f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -761,7 +761,7 @@ object JdbcUtils extends Logging { * @param maxConnections Maximum number of simultaneous connections to open to the database */ private def executeStatements(targetDb: String, - statements: Dataset[String], + statements: DataFrame, batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { // To avoid overloading database coalesce to a set number of partitions if necessary @@ -780,7 +780,7 @@ object JdbcUtils extends Logging { try { batch.grouped(batchSize).foreach { rowBatch => rowBatch.foreach { statement => - st.addBatch(statement) + st.addBatch(statement.getString(0)) } st.executeBatch() dbc.commit() @@ -865,9 +865,7 @@ object JdbcUtils extends Logging { throw new NotImplementedError("Adding data to a non-existing table is not yet supported.") } - val statements = df.map(row => { - genUpdateScript(row, schema, tableName, idColumn) - }) + val statements = df.map(genUpdateScript(_, schema, tableName, idColumn)).toDF() executeStatements(targetDb, statements, batchSize, maxConnections) } @@ -895,9 +893,7 @@ object JdbcUtils extends Logging { throw new NotImplementedError("Adding data to a non-existing table is not yet supported.") } - val statements = df.map(row => { - genInsertScript(row, schema, tableName) - }) + val statements = df.map(genInsertScript(_, schema, tableName)).toDF() executeStatements(targetDb, statements, batchSize, maxConnections) } From 69f69396d67793bc2b9f27cc9a459e21b0a18522 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 17:07:48 -0800 Subject: [PATCH 6/9] Added spark implicits import --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index f23f1bae5a0f6..b55349a0d405a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -858,6 +858,7 @@ object JdbcUtils extends Logging { batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS, idColumn: String = DEFAULT_ID_COLUMN): Unit = { + import df.sparkSession.implicits._ val schema = df.schema val tableExists = checkTableExists(targetDb, tableName) @@ -886,6 +887,7 @@ object JdbcUtils extends Logging { tableName: String, batchSize: Int = 200, maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { + import df.sparkSession.implicits._ val schema = df.schema val tableExists = checkTableExists(targetDb, tableName) @@ -934,6 +936,7 @@ object JdbcUtils extends Logging { batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS, idColumn: String = "id"): Unit = { + import df.sparkSession.implicits._ val storedDb = sqlContext.read.jdbc(targetDb, tableName, properties) // Determine rows to upsert based on a key match in the database From 7938277fbf0511db20d859e1b53c580376b5f13d Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 17:19:12 -0800 Subject: [PATCH 7/9] Added implicit import --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b55349a0d405a..64437e94f6abb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -20,13 +20,9 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, Statement} import java.util.Properties -import scala.collection.JavaConverters._ -import scala.util.Try -import scala.util.control.NonFatal - +import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging -import org.apache.spark.TaskContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -38,6 +34,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + /** * Util functions for JDBC tables. */ @@ -764,6 +764,7 @@ object JdbcUtils extends Logging { statements: DataFrame, batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { + import statements.sparkSession.implicits._ // To avoid overloading database coalesce to a set number of partitions if necessary val coalesced = if (statements.rdd.getNumPartitions > maxConnections) { statements.coalesce(maxConnections) @@ -936,7 +937,6 @@ object JdbcUtils extends Logging { batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS, idColumn: String = "id"): Unit = { - import df.sparkSession.implicits._ val storedDb = sqlContext.read.jdbc(targetDb, tableName, properties) // Determine rows to upsert based on a key match in the database From 56545ed88f665ed57a50a8c5d114c6ae8130eab3 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 Jan 2017 19:00:18 -0800 Subject: [PATCH 8/9] Import order --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 64437e94f6abb..91b3a639cd641 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -20,6 +20,10 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, Statement} import java.util.Properties +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging @@ -34,10 +38,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator -import scala.collection.JavaConverters._ -import scala.util.Try -import scala.util.control.NonFatal - /** * Util functions for JDBC tables. */ @@ -764,7 +764,6 @@ object JdbcUtils extends Logging { statements: DataFrame, batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { - import statements.sparkSession.implicits._ // To avoid overloading database coalesce to a set number of partitions if necessary val coalesced = if (statements.rdd.getNumPartitions > maxConnections) { statements.coalesce(maxConnections) From c6af861b8d1f9a9c72cc6803e417df30148d93ac Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 24 Jan 2017 10:40:22 -0800 Subject: [PATCH 9/9] Added implicit --- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 91b3a639cd641..f9342d7eb6582 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -764,6 +764,8 @@ object JdbcUtils extends Logging { statements: DataFrame, batchSize: Int = DEFAULT_BATCH_SIZE, maxConnections: Int = DEFAULT_MAX_CONNECTIONS): Unit = { + import statements.sparkSession.implicits._ + // To avoid overloading database coalesce to a set number of partitions if necessary val coalesced = if (statements.rdd.getNumPartitions > maxConnections) { statements.coalesce(maxConnections)