Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bbe5587
Adding capability to prioritize peer executors based on rack awarenes…
shubhamchopra May 5, 2016
e130fc7
Minor modifications to get past the style check errors.
shubhamchopra May 6, 2016
e0df5a5
Using blockId hashcode as a source of randomness, so we don't keep ch…
shubhamchopra May 6, 2016
c6954e0
Several changes:
shubhamchopra May 9, 2016
6863e25
Adding null check so a Block Manager can be initiaziled without the m…
shubhamchopra May 9, 2016
f634f0e
Renaming classes/variables from rack to a more general topology.
shubhamchopra May 12, 2016
9275feb
Renaming classes/variables from rack to a more general topology.
shubhamchopra May 12, 2016
5d4178b
We continue to randomly choose peers, so there is no change in curren…
shubhamchopra May 12, 2016
2e3c195
Spelling correction and minor changes in comments to use a more gener…
shubhamchopra May 13, 2016
d481b69
Minor change. Changing replication info message to debug level.
shubhamchopra May 13, 2016
762b8d4
Providing peersReplicateTo to the prioritizer.
shubhamchopra May 16, 2016
6df3ceb
Adding developer api annotations to TopologyMapper and BlockReplicati…
shubhamchopra May 17, 2016
a6dbf3f
Changes recommended by @HyukjinKwon to fix style issues.
shubhamchopra May 18, 2016
487bfae
Updating prioritizer api to use current blockmanager id for self iden…
shubhamchopra May 20, 2016
2ef5199
BlockManagerInitialization now only uses a single message to master t…
shubhamchopra Jul 29, 2016
00c6d0c
1. Changing tail recursive function in BlockManager to an imperative …
shubhamchopra Aug 4, 2016
52c673e
Fixing style issues.
shubhamchopra Aug 5, 2016
c6b252b
Fixing style issues.
shubhamchopra Aug 5, 2016
59e91a7
Adding documentation to clarify on topology.
shubhamchopra Aug 9, 2016
e695cd2
Adding a simple file based topology-mapper.
shubhamchopra Aug 9, 2016
5c33cc8
Fixing style issues.
shubhamchopra Aug 10, 2016
90b4e77
Inlining call to get topology.
shubhamchopra Aug 12, 2016
6782720
converting to not a and not b
shubhamchopra Aug 12, 2016
d8c6210
1. Style modifications as suggested by @rxin. 2. Changing the BlockRe…
shubhamchopra Aug 23, 2016
632d043
Incorporating corrections and suggestions by @rxin
shubhamchopra Aug 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 82 additions & 85 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.storage
import java.io._
import java.nio.ByteBuffer

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.reflect.ClassTag
Expand All @@ -44,6 +45,7 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer


/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
Expand Down Expand Up @@ -147,6 +149,8 @@ private[spark] class BlockManager(
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L

private var blockReplicationPolicy: BlockReplicationPolicy = _

/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
Expand All @@ -160,8 +164,24 @@ private[spark] class BlockManager(
blockTransferService.init(this)
shuffleClient.init(appId)

blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}

val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

val idFromMaster = master.registerBlockManager(
id,
maxMemory,
slaveEndpoint)

blockManagerId = if (idFromMaster != null) idFromMaster else id

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
Expand All @@ -170,12 +190,12 @@ private[spark] class BlockManager(
blockManagerId
}

master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}

logInfo(s"Initialized BlockManager: $blockManagerId")
}

private def registerWithExternalShuffleServer() {
Expand Down Expand Up @@ -1111,109 +1131,86 @@ private[spark] class BlockManager(
}

/**
* Replicate block to another node. Not that this is a blocking call that returns after
* Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {

val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
useOffHeap = level.useOffHeap,
deserialized = level.deserialized,
replication = 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)

var replicationFailed = false
var failures = 0
var done = false

// Get cached list of peers
peersForReplication ++= getPeers(forceFetch = false)

// Get a random peer. Note that this selection of a peer is deterministic on the block id.
// So assuming the list of peers does not change and no replication failures,
// if there are multiple attempts in the same node to replicate the same block,
// the same set of peers will be selected.
def getRandomPeer(): Option[BlockManagerId] = {
// If replication had failed, then force update the cached list of peers and remove the peers
// that have been already used
if (replicationFailed) {
peersForReplication.clear()
peersForReplication ++= getPeers(forceFetch = true)
peersForReplication --= peersReplicatedTo
peersForReplication --= peersFailedToReplicateTo
}
if (!peersForReplication.isEmpty) {
Some(peersForReplication(random.nextInt(peersForReplication.size)))
} else {
None
}
}

// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
//
// This selection of a peer and replication is continued in a loop until one of the
// following 3 conditions is fulfilled:
// (i) specified number of peers have been replicated to
// (ii) too many failures in replicating to peers
// (iii) no peer left to replicate to
//
while (!done) {
getRandomPeer() match {
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host,
peer.port,
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
tLevel,
classTag)
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
replicationFailed = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true // specified number of peers have been replicated to
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
failures += 1
replicationFailed = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) { // too many failures in replicating to peers
done = true
}
val numPeersToReplicateTo = level.replication - 1

val startTime = System.nanoTime

var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0

var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
getPeers(false),
mutable.HashSet.empty,
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailures &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size != numPeersToReplicateTo) {
val peer = peersForReplication.head
try {
val onePeerStartTime = System.nanoTime
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host,
peer.port,
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
tLevel,
classTag)
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
s" in ${(System.nanoTime - onePeerStartTime).toDouble / 1e6} ms")
peersForReplication = peersForReplication.tail
peersReplicatedTo += peer
} catch {
case NonFatal(e) =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
peersFailedToReplicateTo += peer
// we have a failed replication, so we get the list of peers again
// we don't want peers we have already replicated to and the ones that
// have failed previously
val filteredPeers = getPeers(true).filter { p =>
!peersFailedToReplicateTo.contains(p) && !peersReplicatedTo.contains(p)
}
case None => // no peer left to replicate to
done = true

numFailures += 1
peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
filteredPeers,
peersReplicatedTo,
blockId,
numPeersToReplicateTo - peersReplicatedTo.size)
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)

logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}

logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
}

/**
Expand Down
34 changes: 27 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ import org.apache.spark.util.Utils
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int)
private var port_ : Int,
private var topologyInfo_ : Option[String])
extends Externalizable {

private def this() = this(null, null, 0) // For deserialization only
private def this() = this(null, null, 0, None) // For deserialization only

def executorId: String = executorId_

Expand All @@ -60,6 +61,8 @@ class BlockManagerId private (

def port: Int = port_

def topologyInfo: Option[String] = topologyInfo_

def isDriver: Boolean = {
executorId == SparkContext.DRIVER_IDENTIFIER ||
executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
Expand All @@ -69,24 +72,33 @@ class BlockManagerId private (
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
out.writeBoolean(topologyInfo_.isDefined)
// we only write topologyInfo if we have it
topologyInfo.foreach(out.writeUTF(_))
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
val isTopologyInfoAvailable = in.readBoolean()
topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None
}

@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)

override def toString: String = s"BlockManagerId($executorId, $host, $port)"
override def toString: String = s"BlockManagerId($executorId, $host, $port, $topologyInfo)"

override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
override def hashCode: Int =
((executorId.hashCode * 41 + host.hashCode) * 41 + port) * 41 + topologyInfo.hashCode

override def equals(that: Any): Boolean = that match {
case id: BlockManagerId =>
executorId == id.executorId && port == id.port && host == id.host
executorId == id.executorId &&
port == id.port &&
host == id.host &&
topologyInfo == id.topologyInfo
case _ =>
false
}
Expand All @@ -101,10 +113,18 @@ private[spark] object BlockManagerId {
* @param execId ID of the executor.
* @param host Host name of the block manager.
* @param port Port of the block manager.
* @param topologyInfo topology information for the blockmanager, if available
* This can be network topology information for use while choosing peers
* while replicating data blocks. More information available here:
* [[org.apache.spark.storage.TopologyMapper]]
* @return A new [[org.apache.spark.storage.BlockManagerId]].
*/
def apply(execId: String, host: String, port: Int): BlockManagerId =
getCachedBlockManagerId(new BlockManagerId(execId, host, port))
def apply(
execId: String,
host: String,
port: Int,
topologyInfo: Option[String] = None): BlockManagerId =
getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo))

def apply(in: ObjectInput): BlockManagerId = {
val obj = new BlockManagerId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,20 @@ class BlockManagerMaster(
logInfo("Removal of executor " + execId + " requested")
}

/** Register the BlockManager's id with the driver. */
/**
* Register the BlockManager's id with the driver. The input BlockManagerId does not contain
* topology information. This information is obtained from the master and we respond with an
* updated BlockManagerId fleshed out with this information.
*/
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $blockManagerId")
val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}

def updateBlockInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ class BlockManagerMasterEndpoint(
private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)

private val topologyMapper = {
val topologyMapperClassName = conf.get(
"spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName)
val clazz = Utils.classForName(topologyMapperClassName)
val mapper =
clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
logInfo(s"Using $topologyMapperClassName for getting topology information")
mapper
}

logInfo("BlockManagerMasterEndpoint up")

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
Expand Down Expand Up @@ -298,7 +309,21 @@ class BlockManagerMasterEndpoint(
).map(_.flatten.toSeq)
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
/**
* Returns the BlockManagerId with topology information populated, if available.
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))

val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
Expand All @@ -318,6 +343,7 @@ class BlockManagerMasterEndpoint(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
id
}

private def updateBlockInfo(
Expand Down
Loading