Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}

import org.apache.spark.internal.config
import org.apache.spark.SparkContext

/**
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
Expand All @@ -40,9 +43,14 @@ private[spark] abstract class StreamFileInputFormat[T]
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,17 @@ package object config {
"encountering corrupt files and contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)

private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
.createWithDefault(128 * 1024 * 1024)

private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes")
.doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimate, then the partitions with small files will be faster than partitions with" +
" bigger files.")
.longConf
.createWithDefault(4 * 1024 * 1024)
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.input.StreamFileInputFormat

private[spark] class BinaryFileRDD[T](
sc: SparkContext,
@transient private val sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
Expand All @@ -43,7 +43,7 @@ private[spark] class BinaryFileRDD[T](
case _ =>
}
val jobContext = new JobContextImpl(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
inputFormat.setMinPartitions(sc, jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand Down
16 changes: 16 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,22 @@ Apart from these, the following properties are also available, and may be useful
its contents do not match those of the source.
</td>
</tr>
<tr>
<td><code>spark.files.maxPartitionBytes</code></td>
<td>134217728 (128 MB)</td>
<td>
The maximum number of bytes to pack into a single partition when reading files.
</td>
</tr>
<tr>
<td><code>spark.files.openCostInBytes</code></td>
<td>4194304 (4 MB)</td>
<td>
The estimated cost to open a file, measured by the number of bytes could be scanned in the same
time. This is used when putting multiple files into a partition. It is better to over estimate,
then the partitions with small files will be faster than partitions with bigger files.
</td>
</tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
<td>false</td>
Expand Down