Skip to content

Commit 41efde0

Browse files
Merge branch 'master' into kerberos-longrunning
2 parents d282d7a + 7683982 commit 41efde0

File tree

117 files changed

+3023
-1804
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+3023
-1804
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.scheduler._
23+
import org.apache.spark.util.{SystemClock, Clock}
2324

2425
/**
2526
* An agent that dynamically allocates and removes executors based on the workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
123124
private val intervalMillis: Long = 100
124125

125126
// Clock used to schedule when executors should be added and removed
126-
private var clock: Clock = new RealClock
127+
private var clock: Clock = new SystemClock()
127128

128129
// Listener for Spark events that impact the allocation policy
129130
private val listener = new ExecutorAllocationListener
@@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
588589
private object ExecutorAllocationManager {
589590
val NOT_SET = Long.MaxValue
590591
}
591-
592-
/**
593-
* An abstract clock for measuring elapsed time.
594-
*/
595-
private trait Clock {
596-
def getTimeMillis: Long
597-
}
598-
599-
/**
600-
* A clock backed by a monotonically increasing time source.
601-
* The time returned by this clock does not correspond to any notion of wall-clock time.
602-
*/
603-
private class RealClock extends Clock {
604-
override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
605-
}
606-
607-
/**
608-
* A clock that allows the caller to customize the time.
609-
* This is used mainly for testing.
610-
*/
611-
private class TestClock(startTimeMillis: Long) extends Clock {
612-
private var time: Long = startTimeMillis
613-
override def getTimeMillis: Long = time
614-
def tick(ms: Long): Unit = { time += ms }
615-
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
548548
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
549549
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
550550
* modified collection. Pass a copy of the argument to avoid this.
551+
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
552+
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
551553
*/
552554
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
553555
assertNotStopped()

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@ package org.apache.spark.deploy.worker
2020
import java.io._
2121

2222
import scala.collection.JavaConversions._
23-
import scala.collection.Map
2423

2524
import akka.actor.ActorRef
2625
import com.google.common.base.Charsets.UTF_8
2726
import com.google.common.io.Files
28-
import org.apache.hadoop.conf.Configuration
2927
import org.apache.hadoop.fs.{FileUtil, Path}
3028

3129
import org.apache.spark.{Logging, SparkConf}
32-
import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
30+
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
3331
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
3432
import org.apache.spark.deploy.master.DriverState
3533
import org.apache.spark.deploy.master.DriverState.DriverState
34+
import org.apache.spark.util.{Clock, SystemClock}
3635

3736
/**
3837
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -59,9 +58,7 @@ private[spark] class DriverRunner(
5958
// Decoupled for testing
6059
private[deploy] def setClock(_clock: Clock) = clock = _clock
6160
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
62-
private var clock = new Clock {
63-
def currentTimeMillis(): Long = System.currentTimeMillis()
64-
}
61+
private var clock: Clock = new SystemClock()
6562
private var sleeper = new Sleeper {
6663
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
6764
}
@@ -190,9 +187,9 @@ private[spark] class DriverRunner(
190187
initialize(process.get)
191188
}
192189

193-
val processStart = clock.currentTimeMillis()
190+
val processStart = clock.getTimeMillis()
194191
val exitCode = process.get.waitFor()
195-
if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
192+
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
196193
waitSeconds = 1
197194
}
198195

@@ -208,10 +205,6 @@ private[spark] class DriverRunner(
208205
}
209206
}
210207

211-
private[deploy] trait Clock {
212-
def currentTimeMillis(): Long
213-
}
214-
215208
private[deploy] trait Sleeper {
216209
def sleep(seconds: Int)
217210
}

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,17 @@ import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLCla
2626

2727
/**
2828
* Utility object for launching driver programs such that they share fate with the Worker process.
29+
* This is used in standalone cluster mode only.
2930
*/
3031
object DriverWrapper {
3132
def main(args: Array[String]) {
3233
args.toList match {
34+
/*
35+
* IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both
36+
* backward and forward compatible across future Spark versions. Because this gateway
37+
* uses this class to launch the driver, the ordering and semantics of the arguments
38+
* here must also remain consistent across versions.
39+
*/
3340
case workerUrl :: userJar :: mainClass :: extraArgs =>
3441
val conf = new SparkConf()
3542
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ private[spark] class ExecutorRunner(
105105
workerThread.interrupt()
106106
workerThread = null
107107
state = ExecutorState.KILLED
108-
Runtime.getRuntime.removeShutdownHook(shutdownHook)
108+
try {
109+
Runtime.getRuntime.removeShutdownHook(shutdownHook)
110+
} catch {
111+
case e: IllegalStateException => None
112+
}
109113
}
110114
}
111115

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
213213
} else {
214214
basicBucketFunction _
215215
}
216-
self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
216+
if (self.partitions.length == 0) {
217+
new Array[Long](buckets.length - 1)
218+
} else {
219+
// reduce() requires a non-empty RDD. This works because the mapPartitions will make
220+
// non-empty partitions out of empty ones. But it doesn't handle the no-partitions case,
221+
// which is below
222+
self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
223+
}
217224
}
218225

219226
}

core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag](
6262

6363
override def getPartitions: Array[Partition] = {
6464
// bounds are inclusive, hence the + 1 here and - 1 on end
65-
val length = 1 + upperBound - lowerBound
65+
val length = BigInt(1) + upperBound - lowerBound
6666
(0 until numPartitions).map(i => {
67-
val start = lowerBound + ((i * length) / numPartitions).toLong
68-
val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
69-
new JdbcPartition(i, start, end)
67+
val start = lowerBound + ((i * length) / numPartitions)
68+
val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
69+
new JdbcPartition(i, start.toLong, end.toLong)
7070
}).toArray
7171
}
7272

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,9 @@ abstract class RDD[T: ClassTag](
11461146
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
11471147
* results from that partition to estimate the number of additional partitions needed to satisfy
11481148
* the limit.
1149+
*
1150+
* @note due to complications in the internal implementation, this method will raise
1151+
* an exception if called on an RDD of `Nothing` or `Null`.
11491152
*/
11501153
def take(num: Int): Array[T] = {
11511154
if (num == 0) {
@@ -1258,6 +1261,10 @@ abstract class RDD[T: ClassTag](
12581261
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
12591262

12601263
/**
1264+
* @note due to complications in the internal implementation, this method will raise an
1265+
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
1266+
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
1267+
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
12611268
* @return true if and only if the RDD contains no elements at all. Note that an RDD
12621269
* may be empty even when it has at least 1 partition.
12631270
*/

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class DAGScheduler(
6363
mapOutputTracker: MapOutputTrackerMaster,
6464
blockManagerMaster: BlockManagerMaster,
6565
env: SparkEnv,
66-
clock: org.apache.spark.util.Clock = SystemClock)
66+
clock: Clock = new SystemClock())
6767
extends Logging {
6868

6969
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
@@ -657,7 +657,7 @@ class DAGScheduler(
657657
// completion events or stage abort
658658
stageIdToStage -= s.id
659659
jobIdToStageIds -= job.jobId
660-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
660+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
661661
}
662662
}
663663

@@ -706,7 +706,7 @@ class DAGScheduler(
706706
stage.latestInfo.stageFailed(stageFailedMessage)
707707
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
708708
}
709-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
709+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
710710
}
711711
}
712712

@@ -745,7 +745,7 @@ class DAGScheduler(
745745
logInfo("Missing parents: " + getMissingParentStages(finalStage))
746746
val shouldRunLocally =
747747
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
748-
val jobSubmissionTime = clock.getTime()
748+
val jobSubmissionTime = clock.getTimeMillis()
749749
if (shouldRunLocally) {
750750
// Compute very short actions like first() or take() with no parent stages locally.
751751
listenerBus.post(
@@ -871,7 +871,7 @@ class DAGScheduler(
871871
logDebug("New pending tasks: " + stage.pendingTasks)
872872
taskScheduler.submitTasks(
873873
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
874-
stage.latestInfo.submissionTime = Some(clock.getTime())
874+
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
875875
} else {
876876
// Because we posted SparkListenerStageSubmitted earlier, we should post
877877
// SparkListenerStageCompleted here in case there are no tasks to run.
@@ -940,12 +940,12 @@ class DAGScheduler(
940940

941941
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
942942
val serviceTime = stage.latestInfo.submissionTime match {
943-
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
943+
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
944944
case _ => "Unknown"
945945
}
946946
if (errorMessage.isEmpty) {
947947
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
948-
stage.latestInfo.completionTime = Some(clock.getTime())
948+
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
949949
} else {
950950
stage.latestInfo.stageFailed(errorMessage.get)
951951
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
@@ -971,7 +971,7 @@ class DAGScheduler(
971971
markStageAsFinished(stage)
972972
cleanupStateForJobAndIndependentStages(job)
973973
listenerBus.post(
974-
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
974+
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
975975
}
976976

977977
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1187,7 +1187,7 @@ class DAGScheduler(
11871187
}
11881188
val dependentJobs: Seq[ActiveJob] =
11891189
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
1190-
failedStage.latestInfo.completionTime = Some(clock.getTime())
1190+
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
11911191
for (job <- dependentJobs) {
11921192
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
11931193
}
@@ -1242,7 +1242,7 @@ class DAGScheduler(
12421242
if (ableToCancelStages) {
12431243
job.listener.jobFailed(error)
12441244
cleanupStateForJobAndIndependentStages(job)
1245-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
1245+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
12461246
}
12471247
}
12481248

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
5151
sched: TaskSchedulerImpl,
5252
val taskSet: TaskSet,
5353
val maxTaskFailures: Int,
54-
clock: Clock = SystemClock)
54+
clock: Clock = new SystemClock())
5555
extends Schedulable with Logging {
5656

5757
val conf = sched.sc.conf
@@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
166166
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
167167
// We then move down if we manage to launch a "more local" task.
168168
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
169-
var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
169+
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
170170

171171
override def schedulableQueue = null
172172

@@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
281281
val failed = failedExecutors.get(taskId).get
282282

283283
return failed.contains(execId) &&
284-
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
284+
clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
285285
}
286286

287287
false
@@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
428428
: Option[TaskDescription] =
429429
{
430430
if (!isZombie) {
431-
val curTime = clock.getTime()
431+
val curTime = clock.getTimeMillis()
432432

433433
var allowedLocality = maxLocality
434434

@@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
459459
lastLaunchTime = curTime
460460
}
461461
// Serialize and return the task
462-
val startTime = clock.getTime()
462+
val startTime = clock.getTimeMillis()
463463
val serializedTask: ByteBuffer = try {
464464
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
465465
} catch {
@@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
674674
return
675675
}
676676
val key = ef.description
677-
val now = clock.getTime()
677+
val now = clock.getTimeMillis()
678678
val (printFull, dupCount) = {
679679
if (recentExceptions.contains(key)) {
680680
val (dupCount, printTime) = recentExceptions(key)
@@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
706706
}
707707
// always add to failed executors
708708
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
709-
put(info.executorId, clock.getTime())
709+
put(info.executorId, clock.getTimeMillis())
710710
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
711711
addPendingTask(index)
712712
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
@@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
821821
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
822822
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
823823
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
824-
val time = clock.getTime()
824+
val time = clock.getTimeMillis()
825825
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
826826
Arrays.sort(durations)
827827
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))

0 commit comments

Comments
 (0)