From 7df41a1fca44cacd6649a9ca51e8969dc24d1721 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 21 Nov 2016 12:24:36 -0800 Subject: [PATCH 1/7] [SPARK-18413][SQL][FOLLOW-UP] Use `numPartitions` instead of `maxConnections` JDBCOption --- docs/sql-programming-guide.md | 18 ++++++++++-------- .../datasources/jdbc/JDBCOptions.scala | 16 +++++++--------- .../jdbc/JdbcRelationProvider.scala | 2 +- .../execution/datasources/jdbc/JdbcUtils.scala | 6 +++--- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 6 +++--- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 656e7ecdab0bb..451fde7528052 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1061,7 +1061,7 @@ the following case-sensitive options: - partitionColumn, lowerBound, upperBound, numPartitions + partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. @@ -1072,6 +1072,15 @@ the following case-sensitive options: + + numPartitions + + The number of partitions that can be used, if set. It works by limiting both read and write + operations' parallelism. If the number of partitions to write exceeds this limit, the + operation will coalesce the data set to fewer partitions before writing. + + + fetchsize @@ -1086,13 +1095,6 @@ the following case-sensitive options: - - maxConnections - - The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. - - - isolationLevel diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index d416eec6ddaec..2bf06059d7151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -74,6 +74,12 @@ class JDBCOptions( } } + // the number of partitions + val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt) + require(numPartitions.isEmpty || numPartitions.get > 0, + s"Invalid value `${numPartitions.get}` for parameter `$JDBC_NUM_PARTITIONS`. " + + "The minimum value is 1.") + // ------------------------------------------------------------ // Optional parameters only for reading // ------------------------------------------------------------ @@ -83,10 +89,8 @@ class JDBCOptions( val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null) // the upper bound of the partition column val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null) - // the number of partitions - val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null) require(partitionColumn == null || - (lowerBound != null && upperBound != null && numPartitions != null), + (lowerBound != null && upperBound != null && numPartitions.isDefined), s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + s" and '$JDBC_NUM_PARTITIONS' are required.") val fetchSize = { @@ -122,11 +126,6 @@ class JDBCOptions( case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } - // the maximum number of connections - val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt) - require(maxConnections.isEmpty || maxConnections.get > 0, - s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " + - "The minimum value is 1.") } object JDBCOptions { @@ -149,5 +148,4 @@ object JDBCOptions { val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") - val JDBC_MAX_CONNECTIONS = newOption("maxConnections") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 4420b3b18a907..d9471f79a1804 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -39,7 +39,7 @@ class JdbcRelationProvider extends CreatableRelationProvider null } else { JDBCPartitioningInfo( - partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) + partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.get) } val parts = JDBCRelation.columnPartition(partitionInfo) JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cdc3c99daa1ab..c2a1ad84b952e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -667,10 +667,10 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel - val maxConnections = options.maxConnections + val numPartitions = options.numPartitions val repartitionedDF = - if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) { - df.coalesce(maxConnections.get) + if (numPartitions.isDefined && numPartitions.get < df.rdd.getNumPartitions) { + df.coalesce(numPartitions.get) } else { df } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 5795b4d860cb1..c834419948c53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -313,15 +313,15 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { .save() } - test("SPARK-18413: Add `maxConnections` JDBCOption") { + test("SPARK-18413: Use `numPartitions` JDBCOption") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val e = intercept[IllegalArgumentException] { df.write.format("jdbc") .option("dbtable", "TEST.SAVETEST") .option("url", url1) - .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0") + .option(s"${JDBCOptions.JDBC_NUM_PARTITIONS}", "0") .save() }.getMessage - assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1")) + assert(e.contains("Invalid value `0` for parameter `numPartitions`. The minimum value is 1")) } } From 48b6d2513ddebcf365999c8567a7e6dbcc5a3bb2 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 21 Nov 2016 13:40:30 -0800 Subject: [PATCH 2/7] Update docs. --- docs/sql-programming-guide.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 451fde7528052..936253814e1b8 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1077,7 +1077,8 @@ the following case-sensitive options: The number of partitions that can be used, if set. It works by limiting both read and write operations' parallelism. If the number of partitions to write exceeds this limit, the - operation will coalesce the data set to fewer partitions before writing. + operation will coalesce the data set to fewer partitions before writing. In other words, + this determines the maximum number of concurrent JDBC connections. From f8c67bab7674a1d5f55a994118d31f6ecddc7311 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 21 Nov 2016 15:23:28 -0800 Subject: [PATCH 3/7] Update docs. --- docs/sql-programming-guide.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 936253814e1b8..fe24fd314a441 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1063,8 +1063,9 @@ the following case-sensitive options: partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. They describe how to - partition the table when reading in parallel from multiple workers. + These options must all be specified if any of them is specified. In addition, + numPartitions must be specified. They describe how to partition the table when + reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be @@ -1076,9 +1077,10 @@ the following case-sensitive options: numPartitions The number of partitions that can be used, if set. It works by limiting both read and write - operations' parallelism. If the number of partitions to write exceeds this limit, the - operation will coalesce the data set to fewer partitions before writing. In other words, - this determines the maximum number of concurrent JDBC connections. + operations' parallelism. In other words, this determines the maximum number of concurrent + JDBC connections. For reading, it will make partitions less than or equal to this maximum. + For writing, if the number of partitions to write exceeds this limit, the operation will + coalesce the data set with this maximum before writing. From ba8be4626759b9334547de1a2f1e34e0e7d5b842 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 22 Nov 2016 10:22:23 -0800 Subject: [PATCH 4/7] Update docs. --- docs/sql-programming-guide.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index fe24fd314a441..be53a8d03848a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1076,11 +1076,10 @@ the following case-sensitive options: numPartitions - The number of partitions that can be used, if set. It works by limiting both read and write - operations' parallelism. In other words, this determines the maximum number of concurrent - JDBC connections. For reading, it will make partitions less than or equal to this maximum. - For writing, if the number of partitions to write exceeds this limit, the operation will - coalesce the data set with this maximum before writing. + The maximum number of partitions that can be used for parallelism in table reading and + writing. This also determines the maximum number of concurrent JDBC connections. + If the number of partitions to write exceeds this limit, we decrease it to this limit by + calling coalesce(numPartitions) before writing. From d45467eedd1fb4fe31a2f667f0188b15c6ac3cd7 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 22 Nov 2016 12:40:10 -0800 Subject: [PATCH 5/7] Address comments. --- .../sql/execution/datasources/jdbc/JdbcRelationProvider.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index d9471f79a1804..f037f6da00623 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -35,7 +35,8 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null) { + val partitionInfo = if (partitionColumn == null || lowerBound == null || upperBound == null || + numPartitions.isEmpty) { null } else { JDBCPartitioningInfo( From f9db374d7c90f8d8d7875a13cc17644821779e88 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 23 Nov 2016 20:04:30 -0800 Subject: [PATCH 6/7] Use `Option` type. --- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 10 +++++----- .../datasources/jdbc/JdbcRelationProvider.scala | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 2bf06059d7151..fe2f4c1d78647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -84,13 +84,13 @@ class JDBCOptions( // Optional parameters only for reading // ------------------------------------------------------------ // the column used to partition - val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null) + val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN) // the lower bound of partition column - val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null) + val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) // the upper bound of the partition column - val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null) - require(partitionColumn == null || - (lowerBound != null && upperBound != null && numPartitions.isDefined), + val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) + require(partitionColumn.isEmpty || + (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined), s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + s" and '$JDBC_NUM_PARTITIONS' are required.") val fetchSize = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index f037f6da00623..90277454db95b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -35,12 +35,12 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null || lowerBound == null || upperBound == null || + val partitionInfo = if (partitionColumn.isEmpty || lowerBound.isEmpty || upperBound.isEmpty || numPartitions.isEmpty) { null } else { JDBCPartitioningInfo( - partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.get) + partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get) } val parts = JDBCRelation.columnPartition(partitionInfo) JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession) From 4a957e420a873e8dd976fc0aa2d59564c05d667b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 24 Nov 2016 19:34:13 -0800 Subject: [PATCH 7/7] Add `assert`. --- .../execution/datasources/jdbc/JdbcRelationProvider.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 90277454db95b..74f397c01e2f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -35,10 +35,11 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn.isEmpty || lowerBound.isEmpty || upperBound.isEmpty || - numPartitions.isEmpty) { + val partitionInfo = if (partitionColumn.isEmpty) { + assert(lowerBound.isEmpty && upperBound.isEmpty) null } else { + assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty) JDBCPartitioningInfo( partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get) }