Skip to content

Commit dae4d5d

Browse files
maropuyhuai
authored andcommitted
[SPARK-15247][SQL] Set the default number of partitions for reading parquet schemas
## What changes were proposed in this pull request? This pr sets the default number of partitions when reading parquet schemas. SQLContext#read#parquet currently yields at least n_executors * n_cores tasks even if parquet data consist of a single small file. This issue could increase the latency for small jobs. ## How was this patch tested? Manually tested and checked. Author: Takeshi YAMAMURO <[email protected]> Closes #13137 from maropu/SPARK-15247.
1 parent bd39ffe commit dae4d5d

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging {
794794
// side, and resemble fake `FileStatus`es there.
795795
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
796796

797+
// Set the number of partitions to prevent following schema reads from generating many tasks
798+
// in case of a small number of parquet files.
799+
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
800+
sparkSession.sparkContext.defaultParallelism)
801+
797802
// Issues a Spark job to read Parquet schema in parallel.
798803
val partiallyMergedSchemas =
799804
sparkSession
800805
.sparkContext
801-
.parallelize(partialFileStatusInfo)
806+
.parallelize(partialFileStatusInfo, numParallelism)
802807
.mapPartitions { iterator =>
803808
// Resembles fake `FileStatus`es with serialized path and length information.
804809
val fakeFileStatuses = iterator.map { case (path, length) =>

0 commit comments

Comments
 (0)