Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
2db19a3
[SPARK-14897][CORE] Upgrade Jetty to latest version of 8
srowen May 3, 2016
bf3c060
[SPARK-14915][CORE] Don't re-queue a task if another attempt has alre…
jasonmoore2k May 5, 2016
a3aa22a
[SPARK-14915] Fix incorrect resolution of merge conflict in commit bf…
srowen May 6, 2016
ab00652
[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor …
cenyuhai May 6, 2016
518af07
[SPARK-15223][DOCS] fix wrongly named config reference
philipphoffmann May 9, 2016
1678bff
[SPARK-15209] Fix display of job descriptions with single quotes in w…
JoshRosen May 9, 2016
d165486
[SPARK-14495][SQL][1.6] fix resolution failure of having clause with …
xwu0226 May 11, 2016
c433c0a
[SPARK-13519][CORE] Driver should tell Executor to stop itself when c…
zsxwing Feb 26, 2016
86bf93e
[SPARK-13522][CORE] Executor should kill itself when it's unable to h…
zsxwing Feb 29, 2016
ced71d3
[SPARK-13522][CORE] Fix the exit log place for heartbeat
zsxwing Feb 29, 2016
e2a43d0
[SPARK-15262] Synchronize block manager / scheduler executor state
May 11, 2016
fd2da7b
[SPARK-15260] Atomically resize memory pools (branch 1.6)
May 12, 2016
7200e6b
[SPARK-14261][SQL] Memory leak in Spark Thrift Server
dosoft May 20, 2016
7ad82b6
[SPARK-15395][CORE] Use getHostString to create RpcAddress (backport …
zsxwing May 20, 2016
9a18115
[SPARK-15165] [SPARK-15205] [SQL] Introduce place holder for comments…
sarutak May 20, 2016
0869914
[SPARK-12583][MESOS] Mesos shuffle service: Don't delete shuffle file…
Mar 14, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,89 @@

package org.apache.spark.deploy.mesos

import java.net.SocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.mutable
import scala.collection.JavaConverters._

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.ThreadUtils

/**
* An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
*/
private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
private[mesos] class MesosExternalShuffleBlockHandler(
transportConf: TransportConf,
cleanerIntervalS: Long)
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {

// Stores a map of driver socket addresses to app ids
private val connectedApps = new mutable.HashMap[SocketAddress, String]
ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
.scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS)

// Stores a map of app id to app state (timeout value and last heartbeat)
private val connectedApps = new ConcurrentHashMap[String, AppState]()

protected override def handleMessage(
message: BlockTransferMessage,
client: TransportClient,
callback: RpcResponseCallback): Unit = {
message match {
case RegisterDriverParam(appId) =>
case RegisterDriverParam(appId, appState) =>
val address = client.getSocketAddress
logDebug(s"Received registration request from app $appId (remote address $address).")
if (connectedApps.contains(address)) {
val existingAppId = connectedApps(address)
if (!existingAppId.equals(appId)) {
logError(s"A new app '$appId' has connected to existing address $address, " +
s"removing previously registered app '$existingAppId'.")
applicationRemoved(existingAppId, true)
}
val timeout = appState.heartbeatTimeout
logInfo(s"Received registration request from app $appId (remote address $address, " +
s"heartbeat timeout $timeout ms).")
if (connectedApps.containsKey(appId)) {
logWarning(s"Received a registration request from app $appId, but it was already " +
s"registered")
}
connectedApps(address) = appId
connectedApps.put(appId, appState)
callback.onSuccess(ByteBuffer.allocate(0))
case Heartbeat(appId) =>
val address = client.getSocketAddress
Option(connectedApps.get(appId)) match {
case Some(existingAppState) =>
logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " +
s"address $address).")
existingAppState.lastHeartbeat = System.nanoTime()
case None =>
logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " +
s"address $address, appId '$appId').")
}
case _ => super.handleMessage(message, client, callback)
}
}

/**
* On connection termination, clean up shuffle files written by the associated application.
*/
override def connectionTerminated(client: TransportClient): Unit = {
val address = client.getSocketAddress
if (connectedApps.contains(address)) {
val appId = connectedApps(address)
logInfo(s"Application $appId disconnected (address was $address).")
applicationRemoved(appId, true /* cleanupLocalDirs */)
connectedApps.remove(address)
} else {
logWarning(s"Unknown $address disconnected.")
}
}

/** An extractor object for matching [[RegisterDriver]] message. */
private object RegisterDriverParam {
def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
def unapply(r: RegisterDriver): Option[(String, AppState)] =
Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime())))
}

private object Heartbeat {
def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
}

private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long)

private class CleanerThread extends Runnable {
override def run(): Unit = {
val now = System.nanoTime()
connectedApps.asScala.foreach { case (appId, appState) =>
if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) {
logInfo(s"Application $appId timed out. Removing shuffle files.")
connectedApps.remove(appId)
applicationRemoved(appId, true)
}
}
}
}
}

Expand All @@ -93,7 +113,8 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage

protected override def newShuffleBlockHandler(
conf: TransportConf): ExternalShuffleBlockHandler = {
new MesosExternalShuffleBlockHandler(conf)
val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s")
new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS)
}
}

Expand Down
35 changes: 34 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ private[spark] class Executor(
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)

/**
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. It means we will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)

/**
* Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
* successful heartbeat will reset it to 0.
*/
private var heartbeatFailures = 0

startDriverHeartbeater()

def launchTask(
Expand Down Expand Up @@ -218,6 +231,7 @@ private[spark] class Executor(
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand All @@ -227,6 +241,17 @@ private[spark] class Executor(
logError(errMsg)
}
}

if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

Expand Down Expand Up @@ -452,8 +477,16 @@ private[spark] class Executor(
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
heartbeatFailures = 0
} catch {
case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
case NonFatal(e) =>
logWarning("Issue communicating with driver in heartbeater", e)
heartbeatFailures += 1
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ object ExecutorExitCode {
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55

/**
* Executor is unable to send heartbeats to the driver more than
* "spark.executor.heartbeat.maxFailures" times.
*/
val HEARTBEAT_FAILURE = 56

def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
Expand All @@ -51,6 +57,8 @@ object ExecutorExitCode {
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
"ExternalBlockStore failed to create a local temporary directory."
case HEARTBEAT_FAILURE =>
"Unable to send heartbeats to driver."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
}

/**
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
* of bytes removed from the pool's capacity.
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
*
* @return number of bytes to be removed from the pool's capacity.
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
Expand All @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
storageRegionSize,
maxMemory - storageRegionSize) {

assertInvariant()

// We always maintain this invariant:
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
private def assertInvariant(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
}

override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
Expand All @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
Expand All @@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
storageMemoryPool.decrementPoolSize(spaceToReclaim)
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
}
}
}
Expand Down Expand Up @@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private[netty] class NettyRpcHandler(
private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = {
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
if (clients.putIfAbsent(client, JBoolean.TRUE) == null) {
dispatcher.postToAll(RemoteProcessConnected(clientAddr))
}
Expand All @@ -605,7 +605,7 @@ private[netty] class NettyRpcHandler(
override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
// If the remove RpcEnv listens to some address, we should also fire a
// RemoteProcessConnectionError for the remote RpcEnv listening address
Expand All @@ -625,7 +625,7 @@ private[netty] class NettyRpcHandler(
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
clients.remove(client)
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
nettyEnv.removeOutbox(clientAddr)
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
val remoteEnvAddress = remoteAddresses.remove(clientAddr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rpc.RpcAddress
* @param rpcAddress The socket address of the endpint.
* @param name Name of the endpoint.
*/
private[netty] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) {
private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) {

require(name != null, "RpcEndpoint name must be provided.")

Expand All @@ -44,7 +44,11 @@ private[netty] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val nam
}
}

private[netty] object RpcEndpointAddress {
private[spark] object RpcEndpointAddress {

def apply(host: String, port: Int, name: String): RpcEndpointAddress = {
new RpcEndpointAddress(host, port, name)
}

def apply(sparkUrl: String): RpcEndpointAddress = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,16 @@ private[spark] class TaskSetManager(
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)

if (successful(index)) {
logInfo(
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
"but another instance of the task has already succeeded, " +
"so not re-queuing the task to be re-executed.")
} else {
addPendingTask(index)
}

if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
context.reply(true)

Expand Down Expand Up @@ -263,7 +267,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None => logInfo(s"Asked to remove non-existent executor $executorId")
case None =>
// SPARK-15262: If an executor is still alive even after the scheduler has removed
// its metadata, we may receive a heartbeat from that executor and tell its block
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
scheduler.sc.env.blockManager.master.removeExecutor(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}

Expand Down
Loading