From 823aba185c562d08474c1aca846bf8de467beee4 Mon Sep 17 00:00:00 2001 From: fidato Date: Mon, 3 Oct 2016 00:21:13 +0200 Subject: [PATCH 1/4] [SPARK-16575] [spark core] partition calculation mismatch with sc.binaryFiles --- .../scala/org/apache/spark/input/PortableDataStream.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index f66510b6f977..110b107eb6fd 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -41,8 +41,9 @@ private[spark] abstract class StreamFileInputFormat[T] * which is set through setMaxSplitSize */ def setMinPartitions(context: JobContext, minPartitions: Int) { - val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum - val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong + val files = listStatus(context).asScala + val totalLen = files.filterNot(_.isDirectory).map(_.getLen).sum + val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, files.size)).toLong super.setMaxSplitSize(maxSplitSize) } From 83a5a7a37d8f58e86e2320fda00a2fee2567dba5 Mon Sep 17 00:00:00 2001 From: fidato Date: Thu, 13 Oct 2016 21:16:55 +0200 Subject: [PATCH 2/4] [SPARK-16575][CORE] ADD cost of opening a file calcuation for BinaryFIle RDD partition, similar to Spark-SQL. --- .../apache/spark/input/PortableDataStream.scala | 13 ++++++++++--- .../apache/spark/internal/config/package.scala | 13 +++++++++++++ .../org/apache/spark/rdd/BinaryFileRDD.scala | 2 +- docs/configuration.md | 16 ++++++++++++++++ 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 110b107eb6fd..59404e08895a 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -27,6 +27,9 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit} +import org.apache.spark.internal.config +import org.apache.spark.SparkContext + /** * A general format for reading whole files in as streams, byte arrays, * or other functions to be added @@ -40,10 +43,14 @@ private[spark] abstract class StreamFileInputFormat[T] * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { + val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) + val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) + val defaultParallelism = sc.defaultParallelism val files = listStatus(context).asScala - val totalLen = files.filterNot(_.isDirectory).map(_.getLen).sum - val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, files.size)).toLong + val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 497ca92c7bc6..4a3e3d5c79ef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -206,4 +206,17 @@ package object config { "encountering corrupt files and contents that have been read will still be returned.") .booleanConf .createWithDefault(false) + + private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes") + .doc("The maximum number of bytes to pack into a single partition when reading files.") + .longConf + .createWithDefault(128 * 1024 * 1024) + + private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes") + .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" + + " the same time. This is used when putting multiple files into a partition. It's better to" + + " over estimate, then the partitions with small files will be faster than partitions with" + + " bigger files.") + .longConf + .createWithDefault(4 * 1024 * 1024) } diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 41832e835474..141f689c22a3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -43,7 +43,7 @@ private[spark] class BinaryFileRDD[T]( case _ => } val jobContext = new JobContextImpl(conf, jobId) - inputFormat.setMinPartitions(jobContext, minPartitions) + inputFormat.setMinPartitions(sc, jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { diff --git a/docs/configuration.md b/docs/configuration.md index 373e22d71a87..1333d334d416 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1019,6 +1019,22 @@ Apart from these, the following properties are also available, and may be useful its contents do not match those of the source. + + spark.files.maxPartitionBytes + 134217728 (128 MB) + + The maximum number of bytes to pack into a single partition when reading files. + + + + spark.files.openCostInBytes + 4194304 (4 MB) + + The estimated cost to open a file, measured by the number of bytes could be scanned in the same + time. This is used when putting multiple files into a partition. It is better to over estimate, + then the partitions with small files will be faster than partitions with bigger files. + + spark.hadoop.cloneConf false From 7b57840757006f6f727a133f8e2544e2fb0a843f Mon Sep 17 00:00:00 2001 From: fidato Date: Sun, 16 Oct 2016 18:03:52 +0200 Subject: [PATCH 3/4] Fix for failing test case. --- core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 141f689c22a3..b30c51b57a7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat private[spark] class BinaryFileRDD[T]( - sc: SparkContext, + @transient sc: SparkContext, inputFormatClass: Class[_ <: StreamFileInputFormat[T]], keyClass: Class[String], valueClass: Class[T], From 53d9d27b9aeeb91f02ee3900c631a70aa5090776 Mon Sep 17 00:00:00 2001 From: fidato Date: Thu, 3 Nov 2016 20:14:25 +0100 Subject: [PATCH 4/4] targets with meta-annotations - SPARK-12527, for Spark Context in BinaryFileRDD --- core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index b30c51b57a7a..50d977a92da5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat private[spark] class BinaryFileRDD[T]( - @transient sc: SparkContext, + @transient private val sc: SparkContext, inputFormatClass: Class[_ <: StreamFileInputFormat[T]], keyClass: Class[String], valueClass: Class[T],