Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")

val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
val executorTimeoutThresholdMs =
getTimeAsSeconds("spark.network.timeout", "120s") * 1000
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +
s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ class SparkContext(config: SparkConf) extends Logging {

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
Expand Down
40 changes: 29 additions & 11 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
Expand Down Expand Up @@ -120,7 +121,7 @@ private[spark] class Executor(
}

// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)

// Whether to monitor killed / interrupted tasks
private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false)
Expand All @@ -147,21 +148,32 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

/**
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. It means we will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES)

/**
* Whether to drop empty accumulators from heartbeats sent to the driver. Including the empty
* accumulators (that satisfy isZero) can make the size of the heartbeat message very large.
*/
private val HEARTBEAT_DROP_ZEROES = conf.get(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES)

/**
* Interval to send heartbeats, in milliseconds
*/
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
"executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
"executor-heartbeater", HEARTBEAT_INTERVAL_MS)

// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)

/**
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. It means we will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)

/**
* Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each
* successful heartbeat will reset it to 0.
Expand Down Expand Up @@ -799,15 +811,21 @@ private[spark] class Executor(
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
val accumulatorsToReport =
if (HEARTBEAT_DROP_ZEROES) {
taskRunner.task.metrics.accumulators().filterNot(_.isZero)
} else {
taskRunner.task.metrics.accumulators()
}
accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
}
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
executorUpdates)
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ package object config {
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional

private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES =
ConfigBuilder("spark.executor.heartbeat.dropZeroAccumulatorUpdates")
.internal()
.booleanConf
.createWithDefault(true)

private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
ConfigBuilder("spark.executor.heartbeatInterval")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")

private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES =
ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60)

private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional

Expand Down
111 changes: 102 additions & 9 deletions core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.lang.Thread.UncaughtExceptionHandler
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.concurrent.duration._
import scala.language.postfixOps
Expand All @@ -33,22 +34,25 @@ import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito.{inOrder, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.memory.MemoryManager
import org.apache.spark.internal.config._
import org.apache.spark.memory.TestMemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout}
import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription}
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.UninterruptibleThread
import org.apache.spark.storage.{BlockManager, BlockManagerId}
import org.apache.spark.util.{LongAccumulator, UninterruptibleThread}

class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually {
class ExecutorSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester {

test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") {
// mock some objects to make Executor.launchTask() happy
Expand Down Expand Up @@ -252,18 +256,107 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
}
}

test("Heartbeat should drop zero accumulator updates") {
heartbeatZeroAccumulatorUpdateTest(true)
}

test("Heartbeat should not drop zero accumulator updates when the conf is disabled") {
heartbeatZeroAccumulatorUpdateTest(false)
}

private def withHeartbeatExecutor(confs: (String, String)*)
(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
val conf = new SparkConf
confs.foreach { case (k, v) => conf.set(k, v) }
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
val executor =
new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true)
val executorClass = classOf[Executor]

// Save all heartbeats sent into an ArrayBuffer for verification
val heartbeats = ArrayBuffer[Heartbeat]()
val mockReceiver = mock[RpcEndpointRef]
when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
.thenAnswer(new Answer[HeartbeatResponse] {
override def answer(invocation: InvocationOnMock): HeartbeatResponse = {
val args = invocation.getArguments()
val mock = invocation.getMock
heartbeats += args(0).asInstanceOf[Heartbeat]
HeartbeatResponse(false)
}
})
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
receiverRef.setAccessible(true)
receiverRef.set(executor, mockReceiver)

f(executor, heartbeats)
}

private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): Unit = {
val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString
withHeartbeatExecutor(c) { (executor, heartbeats) =>
val reportHeartbeat = PrivateMethod[Unit]('reportHeartBeat)

// When no tasks are running, there should be no accumulators sent in heartbeat
executor.invokePrivate(reportHeartbeat())
// invokeReportHeartbeat(executor)
assert(heartbeats.length == 1)
assert(heartbeats(0).accumUpdates.length == 0,
"No updates should be sent when no tasks are running")

// When we start a task with a nonzero accumulator, that should end up in the heartbeat
val metrics = new TaskMetrics()
val nonZeroAccumulator = new LongAccumulator()
nonZeroAccumulator.add(1)
metrics.registerAccumulator(nonZeroAccumulator)

val executorClass = classOf[Executor]
val tasksMap = {
val field =
executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks")
field.setAccessible(true)
field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]]
}
val mockTaskRunner = mock[executor.TaskRunner]
val mockTask = mock[Task[Any]]
when(mockTask.metrics).thenReturn(metrics)
when(mockTaskRunner.taskId).thenReturn(6)
when(mockTaskRunner.task).thenReturn(mockTask)
when(mockTaskRunner.startGCTime).thenReturn(1)
tasksMap.put(6, mockTaskRunner)

executor.invokePrivate(reportHeartbeat())
assert(heartbeats.length == 2)
val updates = heartbeats(1).accumUpdates
assert(updates.length == 1 && updates(0)._1 == 6,
"Heartbeat should only send update for the one task running")
val accumsSent = updates(0)._2.length
assert(accumsSent > 0, "The nonzero accumulator we added should be sent")
if (dropZeroMetrics) {
assert(accumsSent == metrics.accumulators().count(!_.isZero),
"The number of accumulators sent should match the number of nonzero accumulators")
} else {
assert(accumsSent == metrics.accumulators().length,
"The number of accumulators sent should match the number of total accumulators")
}
}
}

private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
val mockEnv = mock[SparkEnv]
val mockRpcEnv = mock[RpcEnv]
val mockMetricsSystem = mock[MetricsSystem]
val mockMemoryManager = mock[MemoryManager]
val mockBlockManager = mock[BlockManager]
when(mockEnv.conf).thenReturn(conf)
when(mockEnv.serializer).thenReturn(serializer)
when(mockEnv.serializerManager).thenReturn(mock[SerializerManager])
when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.memoryManager).thenReturn(new TestMemoryManager(conf))
when(mockEnv.closureSerializer).thenReturn(serializer)
when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", "hostA", 1234))
when(mockEnv.blockManager).thenReturn(mockBlockManager)
SparkEnv.set(mockEnv)
mockEnv
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
Expand Down Expand Up @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}

Expand Down