diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index a8a722dd3c62..de36ac1f3884 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -317,10 +317,12 @@ object PartitioningAwareFileIndex extends Logging { val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) + val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) val statusMap = sparkContext .parallelize(serializedPaths, numParallelism) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71f3a67d0d5a..6372936bd7fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,6 +396,14 @@ object SQLConf { .intConf .createWithDefault(32) + val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = + SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") + .doc("The number of parallelism to list a collection of path recursively, Set the " + + "number to prevent file listing from generating too many tasks.") + .internal() + .intConf + .createWithDefault(10000) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -774,6 +782,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + def parallelPartitionDiscoveryParallelism: Int = + getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =