Skip to content

Conversation

@windpiger
Copy link
Contributor

What changes were proposed in this pull request?

If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an exception:

Positive number of slices required
java.lang.IllegalArgumentException: Positive number of slices required
        at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
        at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186)
        at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
        at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)

How was this patch tested?

unit test added

…en set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed
@windpiger
Copy link
Contributor Author

cc @cloud-fan @gatorsmile

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73552 has finished for PR 17093 at commit 96898a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

test("InMemoryFileIndex with empty rootPaths when PARALLEL_PARTITION_DISCOVERY_THRESHOLD is 0") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this fix, when users setting it to -1, we still face the same strange error. We need a complete fix.

Copy link
Contributor Author

@windpiger windpiger Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the user should not set it to a negative number initiatively, if they set it , throw exception is reasonable

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73568 has finished for PR 17093 at commit a3ac29b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73583 has finished for PR 17093 at commit ec0afac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not we just make sure parallelPartitionDiscoveryThreshold is greater than 0? We can add a condition(via checkValue) in SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I didn't notice there is a checkValue func, let me fix it. thanks!


// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add equal is more clear to understand the conf parallelPartitionDiscoveryThreshold

}

test("InMemoryFileIndex with empty rootPaths when PARALLEL_PARTITION_DISCOVERY_THRESHOLD" +
"is not positive number") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is negative

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is zero value test in it, so I use not positive...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not positive number -> a nonpositive number

"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
"LibSVM data sources.")
.intConf
.checkValue(parallel => parallel >= 0, "The maximum number of files allowed for listing " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it should be The maximum number of root paths?

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73633 has finished for PR 17093 at commit 13b70f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73650 has finished for PR 17093 at commit 0d2334d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM except #17093 (comment)

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73665 has finished for PR 17093 at commit 74d08a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}.getMessage
assert(e.contains("The maximum number of paths allowed for listing files at " +
"driver side must not be negative"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent. : )

Copy link
Contributor Author

@windpiger windpiger Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh...thanks~

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73672 has finished for PR 17093 at commit e1a9072.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73673 has finished for PR 17093 at commit 3c079ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merging to master.

@asfgit asfgit closed this in 8aa560b Mar 1, 2017
asfgit pushed a commit that referenced this pull request Mar 3, 2017
…ed to listFiles twice

## What changes were proposed in this pull request?

Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles`  twice in `InMemoryFileIndex` during `resolveRelation`.

This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice.

But there is a bug in `InMemoryFileIndex` see:
 [SPARK-19748](#17079)
 [SPARK-19761](#17093),
so this pr should be after SPARK-19748/ SPARK-19761.

## How was this patch tested?
unit test added

Author: windpiger <[email protected]>

Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants