diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 656e7ecdab0bb..be53a8d03848a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1061,10 +1061,11 @@ the following case-sensitive options: - partitionColumn, lowerBound, upperBound, numPartitions + partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. They describe how to - partition the table when reading in parallel from multiple workers. + These options must all be specified if any of them is specified. In addition, + numPartitions must be specified. They describe how to partition the table when + reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be @@ -1072,6 +1073,16 @@ the following case-sensitive options: + + numPartitions + + The maximum number of partitions that can be used for parallelism in table reading and + writing. This also determines the maximum number of concurrent JDBC connections. + If the number of partitions to write exceeds this limit, we decrease it to this limit by + calling coalesce(numPartitions) before writing. + + + fetchsize @@ -1086,13 +1097,6 @@ the following case-sensitive options: - - maxConnections - - The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. - - - isolationLevel diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index d416eec6ddaec..fe2f4c1d78647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -74,19 +74,23 @@ class JDBCOptions( } } + // the number of partitions + val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt) + require(numPartitions.isEmpty || numPartitions.get > 0, + s"Invalid value `${numPartitions.get}` for parameter `$JDBC_NUM_PARTITIONS`. " + + "The minimum value is 1.") + // ------------------------------------------------------------ // Optional parameters only for reading // ------------------------------------------------------------ // the column used to partition - val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null) + val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN) // the lower bound of partition column - val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null) + val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) // the upper bound of the partition column - val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null) - // the number of partitions - val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null) - require(partitionColumn == null || - (lowerBound != null && upperBound != null && numPartitions != null), + val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) + require(partitionColumn.isEmpty || + (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined), s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + s" and '$JDBC_NUM_PARTITIONS' are required.") val fetchSize = { @@ -122,11 +126,6 @@ class JDBCOptions( case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } - // the maximum number of connections - val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt) - require(maxConnections.isEmpty || maxConnections.get > 0, - s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " + - "The minimum value is 1.") } object JDBCOptions { @@ -149,5 +148,4 @@ object JDBCOptions { val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") - val JDBC_MAX_CONNECTIONS = newOption("maxConnections") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 4420b3b18a907..74f397c01e2f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -35,11 +35,13 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null) { + val partitionInfo = if (partitionColumn.isEmpty) { + assert(lowerBound.isEmpty && upperBound.isEmpty) null } else { + assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty) JDBCPartitioningInfo( - partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) + partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get) } val parts = JDBCRelation.columnPartition(partitionInfo) JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cdc3c99daa1ab..c2a1ad84b952e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -667,10 +667,10 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel - val maxConnections = options.maxConnections + val numPartitions = options.numPartitions val repartitionedDF = - if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) { - df.coalesce(maxConnections.get) + if (numPartitions.isDefined && numPartitions.get < df.rdd.getNumPartitions) { + df.coalesce(numPartitions.get) } else { df } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 5795b4d860cb1..c834419948c53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -313,15 +313,15 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { .save() } - test("SPARK-18413: Add `maxConnections` JDBCOption") { + test("SPARK-18413: Use `numPartitions` JDBCOption") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val e = intercept[IllegalArgumentException] { df.write.format("jdbc") .option("dbtable", "TEST.SAVETEST") .option("url", url1) - .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0") + .option(s"${JDBCOptions.JDBC_NUM_PARTITIONS}", "0") .save() }.getMessage - assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1")) + assert(e.contains("Invalid value `0` for parameter `numPartitions`. The minimum value is 1")) } }