From 112f9d163208ae856d156c13901f438cf026cdda Mon Sep 17 00:00:00 2001 From: "genmao.ygm" Date: Wed, 9 Nov 2016 19:09:20 +0800 Subject: [PATCH 1/2] SPARK-18379: Make the parallelism of parallelPartitionDiscovery configurable. --- .../datasources/PartitioningAwareFileIndex.scala | 4 +++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index a8a722dd3c62..de36ac1f3884 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -317,10 +317,12 @@ object PartitioningAwareFileIndex extends Logging { val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) + val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) val statusMap = sparkContext .parallelize(serializedPaths, numParallelism) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71f3a67d0d5a..bb8e0ddde94a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,6 +396,13 @@ object SQLConf { .intConf .createWithDefault(32) + val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = + SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") + .doc("The number of parallelism to list a collection of path recursively, Set the " + + "number to prevent file listing from generating too many tasks.") + .intConf + .createWithDefault(100) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -774,6 +781,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + def parallelPartitionDiscoveryParallelism: Int = + getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = From f6cf77f284a73a8aed5ae1349a0697a44d09972b Mon Sep 17 00:00:00 2001 From: dylon Date: Mon, 14 Nov 2016 09:31:52 +0800 Subject: [PATCH 2/2] update default value and view mode of parallelPartitionDiscoveryParallelism --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bb8e0ddde94a..6372936bd7fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -400,8 +400,9 @@ object SQLConf { SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") .doc("The number of parallelism to list a collection of path recursively, Set the " + "number to prevent file listing from generating too many tasks.") + .internal() .intConf - .createWithDefault(100) + .createWithDefault(10000) // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231.