Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,12 @@ object PartitioningAwareFileIndex extends Logging {
val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
val parallelPartitionDiscoveryParallelism =
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, 10000)
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)

val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ object SQLConf {
.intConf
.createWithDefault(32)

val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
.doc("The number of parallelism to list a collection of path recursively, Set the " +
"number to prevent file listing from generating too many tasks.")
.internal()
.intConf
.createWithDefault(10000)

// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
Expand Down Expand Up @@ -774,6 +782,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

def parallelPartitionDiscoveryParallelism: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)

def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
Expand Down