Skip to content

Commit b2e358e

Browse files
committed
Merge remote-tracking branch 'upstream/master' into UDAF
2 parents 6edb5ac + 812b63b commit b2e358e

File tree

44 files changed

+908
-346
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+908
-346
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 30 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark
2020
import java.io.{ObjectInputStream, Serializable}
2121

2222
import scala.collection.generic.Growable
23-
import scala.collection.mutable.Map
23+
import scala.collection.Map
24+
import scala.collection.mutable
2425
import scala.ref.WeakReference
2526
import scala.reflect.ClassTag
2627

@@ -39,25 +40,44 @@ import org.apache.spark.util.Utils
3940
* @param initialValue initial value of accumulator
4041
* @param param helper object defining how to add elements of type `R` and `T`
4142
* @param name human-readable name for use in Spark's web UI
43+
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
44+
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
45+
* thread safe so that they can be reported correctly.
4246
* @tparam R the full accumulated data (result type)
4347
* @tparam T partial data that can be added in
4448
*/
45-
class Accumulable[R, T] (
49+
class Accumulable[R, T] private[spark] (
4650
@transient initialValue: R,
4751
param: AccumulableParam[R, T],
48-
val name: Option[String])
52+
val name: Option[String],
53+
internal: Boolean)
4954
extends Serializable {
5055

56+
private[spark] def this(
57+
@transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
58+
this(initialValue, param, None, internal)
59+
}
60+
61+
def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
62+
this(initialValue, param, name, false)
63+
5164
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
5265
this(initialValue, param, None)
5366

5467
val id: Long = Accumulators.newId
5568

56-
@transient private var value_ = initialValue // Current value on master
69+
@volatile @transient private var value_ : R = initialValue // Current value on master
5770
val zero = param.zero(initialValue) // Zero value to be passed to workers
5871
private var deserialized = false
5972

60-
Accumulators.register(this, true)
73+
Accumulators.register(this)
74+
75+
/**
76+
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
77+
* via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
78+
* reported correctly.
79+
*/
80+
private[spark] def isInternal: Boolean = internal
6181

6282
/**
6383
* Add more data to this accumulator / accumulable
@@ -132,7 +152,8 @@ class Accumulable[R, T] (
132152
in.defaultReadObject()
133153
value_ = zero
134154
deserialized = true
135-
Accumulators.register(this, false)
155+
val taskContext = TaskContext.get()
156+
taskContext.registerAccumulator(this)
136157
}
137158

138159
override def toString: String = if (value_ == null) "null" else value_.toString
@@ -284,16 +305,7 @@ private[spark] object Accumulators extends Logging {
284305
* It keeps weak references to these objects so that accumulators can be garbage-collected
285306
* once the RDDs and user-code that reference them are cleaned up.
286307
*/
287-
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
288-
289-
/**
290-
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
291-
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
292-
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
293-
*/
294-
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
295-
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
296-
}
308+
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
297309

298310
private var lastId: Long = 0
299311

@@ -302,19 +314,8 @@ private[spark] object Accumulators extends Logging {
302314
lastId
303315
}
304316

305-
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
306-
if (original) {
307-
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
308-
} else {
309-
localAccums.get()(a.id) = a
310-
}
311-
}
312-
313-
// Clear the local (non-original) accumulators for the current thread
314-
def clear() {
315-
synchronized {
316-
localAccums.get.clear()
317-
}
317+
def register(a: Accumulable[_, _]): Unit = synchronized {
318+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
318319
}
319320

320321
def remove(accId: Long) {
@@ -323,15 +324,6 @@ private[spark] object Accumulators extends Logging {
323324
}
324325
}
325326

326-
// Get the values of the local accumulators for the current thread (by ID)
327-
def values: Map[Long, Any] = synchronized {
328-
val ret = Map[Long, Any]()
329-
for ((id, accum) <- localAccums.get) {
330-
ret(id) = accum.localValue
331-
}
332-
return ret
333-
}
334-
335327
// Add values to the original accumulators with some given IDs
336328
def add(values: Map[Long, Any]): Unit = synchronized {
337329
for ((id, value) <- values) {

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
181181
// Asynchronously kill the executor to avoid blocking the current thread
182182
killExecutorThread.submit(new Runnable {
183183
override def run(): Unit = Utils.tryLogNonFatalError {
184-
sc.killExecutor(executorId)
184+
// Note: we want to get an executor back after expiring this one,
185+
// so do not simply call `sc.killExecutor` here (SPARK-8119)
186+
sc.killAndReplaceExecutor(executorId)
185187
}
186188
})
187189
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,6 +1419,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14191419
/**
14201420
* :: DeveloperApi ::
14211421
* Request that the cluster manager kill the specified executors.
1422+
*
1423+
* Note: This is an indication to the cluster manager that the application wishes to adjust
1424+
* its resource usage downwards. If the application wishes to replace the executors it kills
1425+
* through this method with new ones, it should follow up explicitly with a call to
1426+
* {{SparkContext#requestExecutors}}.
1427+
*
14221428
* This is currently only supported in YARN mode. Return whether the request is received.
14231429
*/
14241430
@DeveloperApi
@@ -1436,12 +1442,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14361442

14371443
/**
14381444
* :: DeveloperApi ::
1439-
* Request that cluster manager the kill the specified executor.
1440-
* This is currently only supported in Yarn mode. Return whether the request is received.
1445+
* Request that the cluster manager kill the specified executor.
1446+
*
1447+
* Note: This is an indication to the cluster manager that the application wishes to adjust
1448+
* its resource usage downwards. If the application wishes to replace the executor it kills
1449+
* through this method with a new one, it should follow up explicitly with a call to
1450+
* {{SparkContext#requestExecutors}}.
1451+
*
1452+
* This is currently only supported in YARN mode. Return whether the request is received.
14411453
*/
14421454
@DeveloperApi
14431455
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
14441456

1457+
/**
1458+
* Request that the cluster manager kill the specified executor without adjusting the
1459+
* application resource requirements.
1460+
*
1461+
* The effect is that a new executor will be launched in place of the one killed by
1462+
* this request. This assumes the cluster manager will automatically and eventually
1463+
* fulfill all missing application resource requests.
1464+
*
1465+
* Note: The replace is by no means guaranteed; another application on the same cluster
1466+
* can steal the window of opportunity and acquire this application's resources in the
1467+
* mean time.
1468+
*
1469+
* This is currently only supported in YARN mode. Return whether the request is received.
1470+
*/
1471+
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
1472+
schedulerBackend match {
1473+
case b: CoarseGrainedSchedulerBackend =>
1474+
b.killExecutors(Seq(executorId), replace = true)
1475+
case _ =>
1476+
logWarning("Killing executors is only supported in coarse-grained mode")
1477+
false
1478+
}
1479+
}
1480+
14451481
/** The version of Spark on which this application is running. */
14461482
def version: String = SPARK_VERSION
14471483

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,22 @@ abstract class TaskContext extends Serializable {
152152
* Returns the manager for this task's managed memory.
153153
*/
154154
private[spark] def taskMemoryManager(): TaskMemoryManager
155+
156+
/**
157+
* Register an accumulator that belongs to this task. Accumulators must call this method when
158+
* deserializing in executors.
159+
*/
160+
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit
161+
162+
/**
163+
* Return the local values of internal accumulators that belong to this task. The key of the Map
164+
* is the accumulator id and the value of the Map is the latest accumulator local value.
165+
*/
166+
private[spark] def collectInternalAccumulators(): Map[Long, Any]
167+
168+
/**
169+
* Return the local values of accumulators that belong to this task. The key of the Map is the
170+
* accumulator id and the value of the Map is the latest accumulator local value.
171+
*/
172+
private[spark] def collectAccumulators(): Map[Long, Any]
155173
}

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark
1919

20+
import scala.collection.mutable.{ArrayBuffer, HashMap}
21+
2022
import org.apache.spark.executor.TaskMetrics
2123
import org.apache.spark.unsafe.memory.TaskMemoryManager
2224
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
2325

24-
import scala.collection.mutable.ArrayBuffer
25-
2626
private[spark] class TaskContextImpl(
2727
val stageId: Int,
2828
val partitionId: Int,
@@ -94,5 +94,18 @@ private[spark] class TaskContextImpl(
9494
override def isRunningLocally(): Boolean = runningLocally
9595

9696
override def isInterrupted(): Boolean = interrupted
97-
}
9897

98+
@transient private val accumulators = new HashMap[Long, Accumulable[_, _]]
99+
100+
private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized {
101+
accumulators(a.id) = a
102+
}
103+
104+
private[spark] override def collectInternalAccumulators(): Map[Long, Any] = synchronized {
105+
accumulators.filter(_._2.isInternal).mapValues(_.localValue).toMap
106+
}
107+
108+
private[spark] override def collectAccumulators(): Map[Long, Any] = synchronized {
109+
accumulators.mapValues(_.localValue).toMap
110+
}
111+
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ private[spark] class Executor(
209209

210210
// Run the actual task and measure its runtime.
211211
taskStart = System.currentTimeMillis()
212-
val value = try {
212+
val (value, accumUpdates) = try {
213213
task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
214214
} finally {
215215
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
@@ -247,7 +247,6 @@ private[spark] class Executor(
247247
m.setResultSerializationTime(afterSerialization - beforeSerialization)
248248
}
249249

250-
val accumUpdates = Accumulators.values
251250
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
252251
val serializedDirectResult = ser.serialize(directResult)
253252
val resultSize = serializedDirectResult.limit
@@ -314,8 +313,6 @@ private[spark] class Executor(
314313
env.shuffleMemoryManager.releaseMemoryForThisThread()
315314
// Release memory used by this thread for unrolling blocks
316315
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
317-
// Release memory used by this thread for accumulators
318-
Accumulators.clear()
319316
runningTasks.remove(taskId)
320317
}
321318
}
@@ -424,6 +421,7 @@ private[spark] class Executor(
424421
metrics.updateShuffleReadMetrics()
425422
metrics.updateInputMetrics()
426423
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
424+
metrics.updateAccumulators()
427425

428426
if (isLocal) {
429427
// JobProgressListener will hold an reference of it during

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,22 @@ class TaskMetrics extends Serializable {
223223
// overhead.
224224
_hostname = TaskMetrics.getCachedHostName(_hostname)
225225
}
226+
227+
private var _accumulatorUpdates: Map[Long, Any] = Map.empty
228+
@transient private var _accumulatorsUpdater: () => Map[Long, Any] = null
229+
230+
private[spark] def updateAccumulators(): Unit = synchronized {
231+
_accumulatorUpdates = _accumulatorsUpdater()
232+
}
233+
234+
/**
235+
* Return the latest updates of accumulators in this task.
236+
*/
237+
def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates
238+
239+
private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = {
240+
_accumulatorsUpdater = accumulatorsUpdater
241+
}
226242
}
227243

228244
private[spark] object TaskMetrics {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.util.Properties
2222
import java.util.concurrent.TimeUnit
2323
import java.util.concurrent.atomic.AtomicInteger
2424

25-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
25+
import scala.collection.Map
26+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack}
2627
import scala.concurrent.duration._
2728
import scala.language.existentials
2829
import scala.language.postfixOps
@@ -556,6 +557,9 @@ class DAGScheduler(
556557
case JobFailed(exception: Exception) =>
557558
logInfo("Job %d failed: %s, took %f s".format
558559
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
560+
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
561+
val callerStackTrace = Thread.currentThread().getStackTrace.tail
562+
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
559563
throw exception
560564
}
561565
}

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.util.Properties
2121

22-
import scala.collection.mutable.Map
22+
import scala.collection.Map
2323
import scala.language.existentials
2424

2525
import org.apache.spark._

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,20 @@ import org.apache.spark.util.Utils
4545
*/
4646
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
4747

48+
/**
49+
* The key of the Map is the accumulator id and the value of the Map is the latest accumulator
50+
* local value.
51+
*/
52+
type AccumulatorUpdates = Map[Long, Any]
53+
4854
/**
4955
* Called by [[Executor]] to run this task.
5056
*
5157
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
5258
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
53-
* @return the result of the task
59+
* @return the result of the task along with updates of Accumulators.
5460
*/
55-
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
61+
final def run(taskAttemptId: Long, attemptNumber: Int): (T, AccumulatorUpdates) = {
5662
context = new TaskContextImpl(
5763
stageId = stageId,
5864
partitionId = partitionId,
@@ -62,12 +68,13 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
6268
runningLocally = false)
6369
TaskContext.setTaskContext(context)
6470
context.taskMetrics.setHostname(Utils.localHostName())
71+
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
6572
taskThread = Thread.currentThread()
6673
if (_killed) {
6774
kill(interruptThread = false)
6875
}
6976
try {
70-
runTask(context)
77+
(runTask(context), context.collectAccumulators())
7178
} finally {
7279
context.markTaskCompleted()
7380
TaskContext.unset()

0 commit comments

Comments
 (0)