-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22357][CORE][FOLLOWUP] SparkContext.binaryFiles ignore minPartitions parameter #22356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
imatiach-msft
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice test!
imatiach-msft
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments
| StandardCharsets.UTF_8) | ||
| } | ||
|
|
||
| assert(sc.binaryFiles(tempDirPath, minPartitions = 1).getNumPartitions === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: maybe put these three asserts in a loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, sure
|
|
||
| test("SPARK-22357 test binaryFiles minPartitions") { | ||
| sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") | ||
| .set("spark.files.openCostInBytes", "0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this setting needed: spark.files.openCostInBytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This removes its effect in the section of code we're really trying to test:
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
val files = listStatus(context).asScala
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see, thanks for pointing that out!
|
Test build #95769 has finished for PR 22356 at commit
|
|
Test build #95771 has finished for PR 22356 at commit
|
|
Thanks for taking my codes. Looks good. |
|
Merged to master/2.4 |
…itions parameter ## What changes were proposed in this pull request? This adds a test following #21638 ## How was this patch tested? Existing tests and new test. Closes #22356 from srowen/SPARK-22357.2. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 4e3365b) Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
This adds a test following #21638
How was this patch tested?
Existing tests and new test.