Skip to content

Commit c8879bf

Browse files
fidato13rxin
authored andcommitted
[SPARK-16575][CORE] partition calculation mismatch with sc.binaryFiles
## What changes were proposed in this pull request? This Pull request comprises of the critical bug SPARK-16575 changes. This change rectifies the issue with BinaryFileRDD partition calculations as upon creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of two partitions only. ## How was this patch tested? The original issue ie. getNumPartitions on binary Files RDD (always having two partitions) was first replicated and then tested upon the changes. Also the unit tests have been checked and passed. This contribution is my original work and I licence the work to the project under the project's open source license srowen hvanhovell rxin vanzin skyluc kmader zsxwing datafarmer Please have a look . Author: fidato <[email protected]> Closes #15327 from fidato13/SPARK-16575. (cherry picked from commit 6f36971) Signed-off-by: Reynold Xin <[email protected]>
1 parent 4cb4e5f commit c8879bf

File tree

4 files changed

+42
-5
lines changed

4 files changed

+42
-5
lines changed

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.Path
2727
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
2828
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
2929

30+
import org.apache.spark.internal.config
31+
import org.apache.spark.SparkContext
32+
3033
/**
3134
* A general format for reading whole files in as streams, byte arrays,
3235
* or other functions to be added
@@ -40,9 +43,14 @@ private[spark] abstract class StreamFileInputFormat[T]
4043
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
4144
* which is set through setMaxSplitSize
4245
*/
43-
def setMinPartitions(context: JobContext, minPartitions: Int) {
44-
val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
45-
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
46+
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
47+
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
48+
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
49+
val defaultParallelism = sc.defaultParallelism
50+
val files = listStatus(context).asScala
51+
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
52+
val bytesPerCore = totalBytes / defaultParallelism
53+
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
4654
super.setMaxSplitSize(maxSplitSize)
4755
}
4856

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,17 @@ package object config {
206206
"encountering corrupt files and contents that have been read will still be returned.")
207207
.booleanConf
208208
.createWithDefault(false)
209+
210+
private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
211+
.doc("The maximum number of bytes to pack into a single partition when reading files.")
212+
.longConf
213+
.createWithDefault(128 * 1024 * 1024)
214+
215+
private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes")
216+
.doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
217+
" the same time. This is used when putting multiple files into a partition. It's better to" +
218+
" over estimate, then the partitions with small files will be faster than partitions with" +
219+
" bigger files.")
220+
.longConf
221+
.createWithDefault(4 * 1024 * 1024)
209222
}

core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.{Partition, SparkContext}
2626
import org.apache.spark.input.StreamFileInputFormat
2727

2828
private[spark] class BinaryFileRDD[T](
29-
sc: SparkContext,
29+
@transient private val sc: SparkContext,
3030
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
3131
keyClass: Class[String],
3232
valueClass: Class[T],
@@ -43,7 +43,7 @@ private[spark] class BinaryFileRDD[T](
4343
case _ =>
4444
}
4545
val jobContext = new JobContextImpl(conf, jobId)
46-
inputFormat.setMinPartitions(jobContext, minPartitions)
46+
inputFormat.setMinPartitions(sc, jobContext, minPartitions)
4747
val rawSplits = inputFormat.getSplits(jobContext).toArray
4848
val result = new Array[Partition](rawSplits.size)
4949
for (i <- 0 until rawSplits.size) {

docs/configuration.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,22 @@ Apart from these, the following properties are also available, and may be useful
10341034
its contents do not match those of the source.
10351035
</td>
10361036
</tr>
1037+
<tr>
1038+
<td><code>spark.files.maxPartitionBytes</code></td>
1039+
<td>134217728 (128 MB)</td>
1040+
<td>
1041+
The maximum number of bytes to pack into a single partition when reading files.
1042+
</td>
1043+
</tr>
1044+
<tr>
1045+
<td><code>spark.files.openCostInBytes</code></td>
1046+
<td>4194304 (4 MB)</td>
1047+
<td>
1048+
The estimated cost to open a file, measured by the number of bytes could be scanned in the same
1049+
time. This is used when putting multiple files into a partition. It is better to over estimate,
1050+
then the partitions with small files will be faster than partitions with bigger files.
1051+
</td>
1052+
</tr>
10371053
<tr>
10381054
<td><code>spark.hadoop.cloneConf</code></td>
10391055
<td>false</td>

0 commit comments

Comments
 (0)