From 57b10ecd083a3368630bf5e121b4b867cf233322 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 12 Nov 2016 20:07:39 -0800 Subject: [PATCH 1/7] [SPARK-18413][SQL] Control the number of JDBC connections by repartition with `numPartition` JDBCOption --- docs/sql-programming-guide.md | 7 +++++++ .../sql/execution/datasources/jdbc/JDBCOptions.scala | 6 ++++++ .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 9 ++++++++- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b9be7a7545ef8..677846da17cb2 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1086,6 +1086,13 @@ the following case-sensitive options: + + maxConnections + + The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed. This option applies only to writing. It defaults to the number of partitions of RDD. + + + isolationLevel diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 7f419b5788c4f..fb2eff76e950d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -122,6 +122,11 @@ class JDBCOptions( case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } + // the maximum number of connections + val maxConnections = parameters.getOrElse(JDBC_MAX_CONNECTIONS, null) + require(maxConnections == null || maxConnections.toInt > 0, + s"Invalid value `$maxConnections` for parameter `$JDBC_MAX_CONNECTIONS`. " + + s"The minimum value is 1.") } object JDBCOptions { @@ -144,4 +149,5 @@ object JDBCOptions { val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") + val JDBC_MAX_CONNECTIONS = newOption("maxConnections") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 41edb6511c2ce..30ccd293cd65f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel - df.foreachPartition(iterator => savePartition( + val maxConnections = options.maxConnections + val repartitionedDF = + if (maxConnections != null && maxConnections.toInt < df.rdd.getNumPartitions) { + df.coalesce(maxConnections.toInt) + } else { + df + } + repartitionedDF.foreachPartition(iterator => savePartition( getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel) ) } From bf1d6bdc74e0f22e797d092eac07678b0cf5c2da Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 17 Nov 2016 08:15:31 -0800 Subject: [PATCH 2/7] Add a negative test case. --- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index fb2eff76e950d..25bccaf3c4b8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -126,7 +126,7 @@ class JDBCOptions( val maxConnections = parameters.getOrElse(JDBC_MAX_CONNECTIONS, null) require(maxConnections == null || maxConnections.toInt > 0, s"Invalid value `$maxConnections` for parameter `$JDBC_MAX_CONNECTIONS`. " + - s"The minimum value is 1.") + "The minimum value is 1.") } object JDBCOptions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index e3d3c6c3a887c..5795b4d860cb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -312,4 +312,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { .options(properties.asScala) .save() } + + test("SPARK-18413: Add `maxConnections` JDBCOption") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val e = intercept[IllegalArgumentException] { + df.write.format("jdbc") + .option("dbtable", "TEST.SAVETEST") + .option("url", url1) + .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0") + .save() + }.getMessage + assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1")) + } } From 28718d092aa81fa2c2b9b5f7856fabb308b78a8d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 18 Nov 2016 13:49:12 -0800 Subject: [PATCH 3/7] Update doc. --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 677846da17cb2..8c7db7b5a2544 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1089,7 +1089,7 @@ the following case-sensitive options: maxConnections - The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed. This option applies only to writing. It defaults to the number of partitions of RDD. + The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. From 0391dbf9df23b90dcd690886dc2c08b7cba7620b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 19 Nov 2016 04:06:03 -0800 Subject: [PATCH 4/7] Update docs --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 8c7db7b5a2544..372397f7279d5 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1089,7 +1089,7 @@ the following case-sensitive options: maxConnections - The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. + If set, limits the maximum number of JDBC connections that can be used. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. From ca75a4e383ecd3e143c594a35279135022097ec3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 19 Nov 2016 04:25:40 -0800 Subject: [PATCH 5/7] Update docs. --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 372397f7279d5..dde2a9b708c74 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1089,7 +1089,7 @@ the following case-sensitive options: maxConnections - If set, limits the maximum number of JDBC connections that can be used. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. + The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. From cbb74d9c985422fb7309d9af8ded559ffdeebe9c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 20 Nov 2016 21:22:12 -0800 Subject: [PATCH 6/7] Use Option[Int]. --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 6 +++--- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 25bccaf3c4b8a..5cda63b1ef62d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -123,9 +123,9 @@ class JDBCOptions( case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } // the maximum number of connections - val maxConnections = parameters.getOrElse(JDBC_MAX_CONNECTIONS, null) - require(maxConnections == null || maxConnections.toInt > 0, - s"Invalid value `$maxConnections` for parameter `$JDBC_MAX_CONNECTIONS`. " + + val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS) + require(maxConnections.isEmpty || maxConnections.get.toInt > 0, + s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " + "The minimum value is 1.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 30ccd293cd65f..5627753eeeb65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -669,8 +669,8 @@ object JdbcUtils extends Logging { val isolationLevel = options.isolationLevel val maxConnections = options.maxConnections val repartitionedDF = - if (maxConnections != null && maxConnections.toInt < df.rdd.getNumPartitions) { - df.coalesce(maxConnections.toInt) + if (maxConnections.isDefined && maxConnections.get.toInt < df.rdd.getNumPartitions) { + df.coalesce(maxConnections.get.toInt) } else { df } From dc152f45703a0225f750c4819556f19a1a9f0d21 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 20 Nov 2016 22:45:43 -0800 Subject: [PATCH 7/7] Use Option[Int] really. --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 4 ++-- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 5cda63b1ef62d..d416eec6ddaec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -123,8 +123,8 @@ class JDBCOptions( case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } // the maximum number of connections - val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS) - require(maxConnections.isEmpty || maxConnections.get.toInt > 0, + val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt) + require(maxConnections.isEmpty || maxConnections.get > 0, s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " + "The minimum value is 1.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5627753eeeb65..cdc3c99daa1ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -669,8 +669,8 @@ object JdbcUtils extends Logging { val isolationLevel = options.isolationLevel val maxConnections = options.maxConnections val repartitionedDF = - if (maxConnections.isDefined && maxConnections.get.toInt < df.rdd.getNumPartitions) { - df.coalesce(maxConnections.get.toInt) + if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) { + df.coalesce(maxConnections.get) } else { df }