Skip to content

Commit b627a6a

Browse files
committed
merge master
2 parents 51fb3d6 + b45c13e commit b627a6a

File tree

38 files changed

+167
-44
lines changed

38 files changed

+167
-44
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
737737
val outfmt = job.getOutputFormatClass
738738
val jobFormat = outfmt.newInstance
739739

740-
if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
740+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
741+
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
741742
// FileOutputFormat ignores the filesystem parameter
742743
jobFormat.checkOutputSpecs(job)
743744
}
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
803804
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
804805
valueClass.getSimpleName + ")")
805806

806-
if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
808+
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807809
// FileOutputFormat ignores the filesystem parameter
808810
val ignoredFs = FileSystem.get(conf)
809811
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
3838
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{SystemClock, Clock, Utils}
4242

4343
/**
4444
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -61,7 +61,8 @@ class DAGScheduler(
6161
listenerBus: LiveListenerBus,
6262
mapOutputTracker: MapOutputTrackerMaster,
6363
blockManagerMaster: BlockManagerMaster,
64-
env: SparkEnv)
64+
env: SparkEnv,
65+
clock: Clock = SystemClock)
6566
extends Logging {
6667

6768
import DAGScheduler._
@@ -781,7 +782,7 @@ class DAGScheduler(
781782
logDebug("New pending tasks: " + myPending)
782783
taskScheduler.submitTasks(
783784
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
784-
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
785+
stageToInfos(stage).submissionTime = Some(clock.getTime())
785786
} else {
786787
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
787788
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -807,11 +808,11 @@ class DAGScheduler(
807808

808809
def markStageAsFinished(stage: Stage) = {
809810
val serviceTime = stageToInfos(stage).submissionTime match {
810-
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
811+
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
811812
case _ => "Unknown"
812813
}
813814
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
814-
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
815+
stageToInfos(stage).completionTime = Some(clock.getTime())
815816
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
816817
runningStages -= stage
817818
}
@@ -1015,7 +1016,7 @@ class DAGScheduler(
10151016
return
10161017
}
10171018
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
1018-
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
1019+
stageToInfos(failedStage).completionTime = Some(clock.getTime())
10191020
for (resultStage <- dependentStages) {
10201021
val job = resultStageToJob(resultStage)
10211022
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
231232
// Input streams are derived both from the in-memory map and spilled maps on disk
232233
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
233234
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
234-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
235+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
235236

236237
inputStreams.foreach { it =>
237238
val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
246247
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
247248
* Assume the given iterator is in sorted order.
248249
*/
249-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
250+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
250251
val kcPairs = new ArrayBuffer[(K, C)]
251252
if (it.hasNext) {
252253
var kc = it.next()
253254
kcPairs += kc
254255
val minHash = kc._1.hashCode()
255-
while (it.hasNext && kc._1.hashCode() == minHash) {
256+
while (it.hasNext && it.head._1.hashCode() == minHash) {
256257
kc = it.next()
257258
kcPairs += kc
258259
}
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
325326
*
326327
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
327328
*/
328-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
329+
private class StreamBuffer(
330+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
329331
extends Comparable[StreamBuffer] {
330332

331333
def isEmpty = pairs.length == 0

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
230230
}
231231
}
232232

233+
test ("allow user to disable the output directory existence checking (old Hadoop API") {
234+
val sf = new SparkConf()
235+
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
236+
sc = new SparkContext(sf)
237+
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
238+
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
239+
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
240+
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
241+
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
242+
}
243+
233244
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
234245
sc = new SparkContext("local", "test")
235246
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
248259
}
249260
}
250261

262+
test ("allow user to disable the output directory existence checking (new Hadoop API") {
263+
val sf = new SparkConf()
264+
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
265+
sc = new SparkContext(sf)
266+
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
267+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
268+
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
269+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
270+
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
271+
}
272+
251273
test ("save Hadoop Dataset through old Hadoop API") {
252274
sc = new SparkContext("local", "test")
253275
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)

core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
277277
("pomatoes", "eructation") // 568647356
278278
)
279279

280+
collisionPairs.foreach { case (w1, w2) =>
281+
// String.hashCode is documented to use a specific algorithm, but check just in case
282+
assert(w1.hashCode === w2.hashCode)
283+
}
284+
280285
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
281286
collisionPairs.foreach { case (w1, w2) =>
282287
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
296301
assert(kv._2.equals(expectedValue))
297302
count += 1
298303
}
299-
assert(count == 100000 + collisionPairs.size * 2)
304+
assert(count === 100000 + collisionPairs.size * 2)
305+
}
306+
307+
test("spilling with many hash collisions") {
308+
val conf = new SparkConf(true)
309+
conf.set("spark.shuffle.memoryFraction", "0.0001")
310+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
311+
312+
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
313+
314+
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
315+
// problems if the map fails to group together the objects with the same code (SPARK-2043).
316+
for (i <- 1 to 10) {
317+
for (j <- 1 to 10000) {
318+
map.insert(FixedHashObject(j, j % 2), 1)
319+
}
320+
}
321+
322+
val it = map.iterator
323+
var count = 0
324+
while (it.hasNext) {
325+
val kv = it.next()
326+
assert(kv._2 === 10)
327+
count += 1
328+
}
329+
assert(count === 10000)
300330
}
301331

302332
test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
317347
}
318348
}
319349
}
350+
351+
/**
352+
* A dummy class that always returns the same hash code, to easily test hash collisions
353+
*/
354+
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
355+
override def hashCode(): Int = h
356+
}

dev/mima

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ if [ $ret_val != 0 ]; then
3131
echo "NOTE: Exceptions to binary compatibility can be added in project/MimaExcludes.scala"
3232
fi
3333

34+
rm -f .generated-mima-excludes
3435
exit $ret_val

docs/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
487487
this duration will be cleared as well.
488488
</td>
489489
</tr>
490+
<tr>
491+
<td>spark.hadoop.validateOutputSpecs</td>
492+
<td>true</td>
493+
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
494+
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
495+
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
496+
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
497+
</tr>
490498
</table>
491499

492500
#### Networking

0 commit comments

Comments
 (0)