Skip to content

Commit 8aa560b

Browse files
windpigergatorsmile
authored andcommitted
[SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed
## What changes were proposed in this pull request? If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an exception: ``` Positive number of slices required java.lang.IllegalArgumentException: Positive number of slices required at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186) at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105) at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit test added Author: windpiger <[email protected]> Closes #17093 from windpiger/fixEmptiPathInBulkListFiles.
1 parent 5502a9c commit 8aa560b

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ object PartitioningAwareFileIndex extends Logging {
300300
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
301301

302302
// Short-circuits parallel listing when serial listing is likely to be faster.
303-
if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
303+
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
304304
return paths.map { path =>
305305
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
306306
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,13 @@ object SQLConf {
402402

403403
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
404404
buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
405-
.doc("The maximum number of files allowed for listing files at driver side. If the number " +
406-
"of detected files exceeds this value during partition discovery, it tries to list the " +
405+
.doc("The maximum number of paths allowed for listing files at driver side. If the number " +
406+
"of detected paths exceeds this value during partition discovery, it tries to list the " +
407407
"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
408408
"LibSVM data sources.")
409409
.intConf
410+
.checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
411+
"files at driver side must not be negative")
410412
.createWithDefault(32)
411413

412414
val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
2727

2828
import org.apache.spark.metrics.source.HiveCatalogMetrics
2929
import org.apache.spark.sql.catalyst.util._
30+
import org.apache.spark.sql.internal.SQLConf
3031
import org.apache.spark.sql.test.SharedSQLContext
3132

3233
class FileIndexSuite extends SharedSQLContext {
@@ -179,6 +180,21 @@ class FileIndexSuite extends SharedSQLContext {
179180
}
180181
}
181182

183+
test("InMemoryFileIndex with empty rootPaths when PARALLEL_PARTITION_DISCOVERY_THRESHOLD" +
184+
"is a nonpositive number") {
185+
withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
186+
new InMemoryFileIndex(spark, Seq.empty, Map.empty, None)
187+
}
188+
189+
val e = intercept[IllegalArgumentException] {
190+
withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "-1") {
191+
new InMemoryFileIndex(spark, Seq.empty, Map.empty, None)
192+
}
193+
}.getMessage
194+
assert(e.contains("The maximum number of paths allowed for listing files at " +
195+
"driver side must not be negative"))
196+
}
197+
182198
test("refresh for InMemoryFileIndex with FileStatusCache") {
183199
withTempDir { dir =>
184200
val fileStatusCache = FileStatusCache.getOrCreate(spark)

0 commit comments

Comments
 (0)