From bbe5587cee5a50df8968b79281fa68a31f4ad6fd Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 5 May 2016 18:06:14 -0400 Subject: [PATCH 01/25] Adding capability to prioritize peer executors based on rack awareness while replicating blocks. --- .../apache/spark/storage/BlockManager.scala | 182 +++++++++--------- .../apache/spark/storage/BlockManagerId.scala | 20 +- .../spark/storage/BlockManagerMaster.scala | 4 + .../storage/BlockManagerMasterEndpoint.scala | 7 + .../spark/storage/BlockManagerMessages.scala | 2 + .../storage/RackAwarePrioritization.scala | 49 +++++ 6 files changed, 171 insertions(+), 93 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aa29acfd7046..475c4c38ef5a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -26,7 +26,6 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal - import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging @@ -44,6 +43,8 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer +import scala.annotation.tailrec + /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( val data: Iterator[Any], @@ -147,6 +148,10 @@ private[spark] class BlockManager( private val peerFetchLock = new Object private var lastPeerFetchTime = 0L + private val rackAwareReplication = conf.getBoolean("spark.replication.rackawareness", false) + + private var rackAwarePrioritizer: RackAwarePriotization = _ + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -160,8 +165,26 @@ private[spark] class BlockManager( blockTransferService.init(this) shuffleClient.init(appId) + val rackInfo = if (rackAwareReplication) { + logInfo("Rack aware block replication enabled. Getting topology information") + val rackStr = master.getRackInfo(blockTransferService.hostName) + logInfo(s"Obtained $rackStr") + if (rackStr.isEmpty) None else Some(rackStr) + } else { + None + } + + rackAwarePrioritizer = if (rackAwareReplication) { + val priorityClass = conf.get("spark.replication.rackawareness.prioritizer", "") + assert(!priorityClass.isEmpty, "Need to specify a class for prioritizing " + + "peers when using rack aware block replication") + Utils.classForName(priorityClass).asInstanceOf[RackAwarePriotization] + } else { + new DefaultRackAwarePrioritization(blockTransferService.hostName) + } + blockManagerId = BlockManagerId( - executorId, blockTransferService.hostName, blockTransferService.port) + executorId, blockTransferService.hostName, blockTransferService.port, rackInfo) shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") @@ -1111,109 +1134,90 @@ private[spark] class BlockManager( } /** - * Replicate block to another node. Not that this is a blocking call that returns after - * the block has been replicated. - */ - private def replicate( - blockId: BlockId, - data: ChunkedByteBuffer, - level: StorageLevel, - classTag: ClassTag[_]): Unit = { + * Replicate block to another node. Not that this is a blocking call that returns after + * the block has been replicated. + */ + private def replicateNew( + blockId: BlockId, + data: ChunkedByteBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Unit = { + val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) - val numPeersToReplicateTo = level.replication - 1 - val peersForReplication = new ArrayBuffer[BlockManagerId] - val peersReplicatedTo = new ArrayBuffer[BlockManagerId] - val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, useOffHeap = level.useOffHeap, deserialized = level.deserialized, replication = 1) - val startTime = System.currentTimeMillis - val random = new Random(blockId.hashCode) - - var replicationFailed = false - var failures = 0 - var done = false - - // Get cached list of peers - peersForReplication ++= getPeers(forceFetch = false) - - // Get a random peer. Note that this selection of a peer is deterministic on the block id. - // So assuming the list of peers does not change and no replication failures, - // if there are multiple attempts in the same node to replicate the same block, - // the same set of peers will be selected. - def getRandomPeer(): Option[BlockManagerId] = { - // If replication had failed, then force update the cached list of peers and remove the peers - // that have been already used - if (replicationFailed) { - peersForReplication.clear() - peersForReplication ++= getPeers(forceFetch = true) - peersForReplication --= peersReplicatedTo - peersForReplication --= peersFailedToReplicateTo - } - if (!peersForReplication.isEmpty) { - Some(peersForReplication(random.nextInt(peersForReplication.size))) - } else { - None - } - } - // One by one choose a random peer and try uploading the block to it - // If replication fails (e.g., target peer is down), force the list of cached peers - // to be re-fetched from driver and then pick another random peer for replication. Also - // temporarily black list the peer for which replication failed. - // - // This selection of a peer and replication is continued in a loop until one of the - // following 3 conditions is fulfilled: - // (i) specified number of peers have been replicated to - // (ii) too many failures in replicating to peers - // (iii) no peer left to replicate to - // - while (!done) { - getRandomPeer() match { - case Some(peer) => - try { - val onePeerStartTime = System.currentTimeMillis - logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") - blockTransferService.uploadBlockSync( - peer.host, - peer.port, - peer.executorId, - blockId, - new NettyManagedBuffer(data.toNetty), - tLevel, - classTag) - logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms" - .format(System.currentTimeMillis - onePeerStartTime)) - peersReplicatedTo += peer - peersForReplication -= peer - replicationFailed = false - if (peersReplicatedTo.size == numPeersToReplicateTo) { - done = true // specified number of peers have been replicated to + val numPeersToReplicateTo = level.replication - 1 + + @tailrec def replicateBlock( + numFailures: Int, + peersForReplication: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = { + + if (numFailures > maxReplicationFailures + || peersForReplication.isEmpty + || peersReplicatedTo.size == numPeersToReplicateTo) { + // This selection of a peer and replication is continued in a loop until one of the + // following 3 conditions is fulfilled: + // (i) specified number of peers have been replicated to + // (ii) too many failures in replicating to peers + // (iii) no peer left to replicate to + peersReplicatedTo + } else { + val peer = peersForReplication.head + val (updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) = try { + val onePeerStartTime = System.currentTimeMillis + logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") + blockTransferService.uploadBlockSync( + peer.host, + peer.port, + peer.executorId, + blockId, + new NettyManagedBuffer(data.toNetty), + tLevel, + classTag) + logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + + s" in ${System.currentTimeMillis - onePeerStartTime} ms") + // the block was replicated, lets update state and move ahead + (numFailures, + peersForReplication.tail, + peersReplicatedTo + peer, + peersFailedToReplicateTo) + } catch { + case NonFatal(e) => + logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) + val updatedFailedPeers = peersFailedToReplicateTo + peer + // we have a failed replication, so we get the list of peers again + // we don't want peers we have already replicated to and the ones that + // have failed previously + val updatedPeers = rackAwarePrioritizer.prioritize(getPeers(true)).filter{p => + !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } - } catch { - case NonFatal(e) => - logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) - failures += 1 - replicationFailed = true - peersFailedToReplicateTo += peer - if (failures > maxReplicationFailures) { // too many failures in replicating to peers - done = true - } - } - case None => // no peer left to replicate to - done = true + (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) + } + + replicateBlock(updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) } } - val timeTakeMs = (System.currentTimeMillis - startTime) + + val startTime = System.currentTimeMillis + val peersReplicatedTo = replicateBlock(0, + rackAwarePrioritizer.prioritize(getPeers(false)), + Set.empty, + Set.empty) logDebug(s"Replicating $blockId of ${data.size} bytes to " + - s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") + s"${peersReplicatedTo.size} peer(s) took ${System.currentTimeMillis - startTime} ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") } + + logInfo(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index cae7c9ed952f..142622be035b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -37,10 +37,11 @@ import org.apache.spark.util.Utils class BlockManagerId private ( private var executorId_ : String, private var host_ : String, - private var port_ : Int) + private var port_ : Int, + private var rackInfo_ : Option[String]) extends Externalizable { - private def this() = this(null, null, 0) // For deserialization only + private def this() = this(null, null, 0, None) // For deserialization only def executorId: String = executorId_ @@ -69,12 +70,23 @@ class BlockManagerId private ( out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) + out.writeBoolean(rackInfo_.isDefined) + // if we don't keep rack information, we just write an empty string. + out.writeUTF(rackInfo_.getOrElse("")) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { executorId_ = in.readUTF() host_ = in.readUTF() port_ = in.readInt() + val isRackInfoAvailable = in.readBoolean() + rackInfo_ = if (isRackInfoAvailable) { + Some(in.readUTF()) + } else { + // we would read an empty string in this case + in.readUTF() + None + } } @throws(classOf[IOException]) @@ -103,8 +115,8 @@ private[spark] object BlockManagerId { * @param port Port of the block manager. * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ - def apply(execId: String, host: String, port: Int): BlockManagerId = - getCachedBlockManagerId(new BlockManagerId(execId, host, port)) + def apply(execId: String, host: String, port: Int, rack: Option[String] = None): BlockManagerId = + getCachedBlockManagerId(new BlockManagerId(execId, host, port, rack)) def apply(in: ObjectInput): BlockManagerId = { val obj = new BlockManagerId() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 8655cf10fc28..e67307c98526 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -227,6 +227,10 @@ class BlockManagerMaster( } } + def getRackInfo(host: String): String = { + driverEndpoint.askWithRetry[String](GetRackInfo(host)) + } + /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ private def tell(message: Any) { if (!driverEndpoint.askWithRetry[Boolean](message)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 8fa12150114d..bd48c3f46bb0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -124,6 +124,13 @@ class BlockManagerMasterEndpoint( } case None => context.reply(false) } + + case GetRackInfo(host) => context.reply(getRackInfoForHost(host)) + + } + + private def getRackInfoForHost(host: String): String = { + s"$host-1" } private def removeRdd(rddId: Int): Future[Seq[Int]] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 6bded9270050..1c0f34e1ed15 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -111,4 +111,6 @@ private[spark] object BlockManagerMessages { case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster + + case class GetRackInfo(host: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala new file mode 100644 index 000000000000..ce0d48ec170d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.internal.Logging + +/* Trait that should be implemented by any class implementing rack aware prioritization */ +trait RackAwarePriotization { + /** + * Method to prioritize a bunch of candidate peers of a block + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] +} + +class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { + /** + * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, + * that just makes sure we put blocks on different hosts, if possible + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + override def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] = { + logInfo(s"Input peers : ${peers.mkString(", ")}") + val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) + val peersOnHost = peers.filter(p => p.host.equals(host)) + val ret = peersOnOtherHosts ++ peersOnHost + logInfo(s"Prioritized peers : ${ret.mkString(", ")}") + ret + } +} \ No newline at end of file From e130fc7817509f31e01565d752006062519e0d1f Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 6 May 2016 13:40:47 -0400 Subject: [PATCH 02/25] Minor modifications to get past the style check errors. --- .../apache/spark/storage/BlockManager.scala | 17 +++++++----- .../storage/RackAwarePrioritization.scala | 26 ++++++++++--------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 475c4c38ef5a..f5d24abb79d2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,12 +20,14 @@ package org.apache.spark.storage import java.io._ import java.nio.ByteBuffer +import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal + import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging @@ -43,8 +45,6 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer -import scala.annotation.tailrec - /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( val data: Iterator[Any], @@ -1134,10 +1134,15 @@ private[spark] class BlockManager( } /** - * Replicate block to another node. Not that this is a blocking call that returns after - * the block has been replicated. - */ - private def replicateNew( + * Replicate block to another node. Note that this is a blocking call that returns after + * the block has been replicated. + * + * @param blockId + * @param data + * @param level + * @param classTag + */ + private def replicate( blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel, diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala index ce0d48ec170d..cb790b5229cc 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -21,23 +21,25 @@ import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ trait RackAwarePriotization { + /** - * Method to prioritize a bunch of candidate peers of a block - * - * @param peers A list of peers of a BlockManager - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ + * Method to prioritize a bunch of candidate peers of a block + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] } class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { + /** - * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, - * that just makes sure we put blocks on different hosts, if possible - * - * @param peers A list of peers of a BlockManager - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ + * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, + * that just makes sure we put blocks on different hosts, if possible + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ override def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] = { logInfo(s"Input peers : ${peers.mkString(", ")}") val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) @@ -46,4 +48,4 @@ class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization logInfo(s"Prioritized peers : ${ret.mkString(", ")}") ret } -} \ No newline at end of file +} From e0df5a57cdcbd1a0b538ed1e9cbf2cf11c0da7d3 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 6 May 2016 16:31:22 -0400 Subject: [PATCH 03/25] Using blockId hashcode as a source of randomness, so we don't keep choosing the same peers for replication. --- .../org/apache/spark/storage/BlockManager.scala | 4 ++-- .../spark/storage/RackAwarePrioritization.scala | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f5d24abb79d2..5364e63312bc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1200,7 +1200,7 @@ private[spark] class BlockManager( // we have a failed replication, so we get the list of peers again // we don't want peers we have already replicated to and the ones that // have failed previously - val updatedPeers = rackAwarePrioritizer.prioritize(getPeers(true)).filter{p => + val updatedPeers = rackAwarePrioritizer.prioritize(getPeers(true), blockId).filter{p => !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) @@ -1212,7 +1212,7 @@ private[spark] class BlockManager( val startTime = System.currentTimeMillis val peersReplicatedTo = replicateBlock(0, - rackAwarePrioritizer.prioritize(getPeers(false)), + rackAwarePrioritizer.prioritize(getPeers(false), blockId), Set.empty, Set.empty) logDebug(s"Replicating $blockId of ${data.size} bytes to " + diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala index cb790b5229cc..7dcf48421fb8 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -17,6 +17,8 @@ package org.apache.spark.storage +import scala.util.Random + import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ @@ -24,11 +26,12 @@ trait RackAwarePriotization { /** * Method to prioritize a bunch of candidate peers of a block - * * @param peers A list of peers of a BlockManager + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] + def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] } class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { @@ -38,12 +41,16 @@ class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization * that just makes sure we put blocks on different hosts, if possible * * @param peers A list of peers of a BlockManager + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] = { + override def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { + val random = new Random(blockId.hashCode) + logInfo(s"Input peers : ${peers.mkString(", ")}") - val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) - val peersOnHost = peers.filter(p => p.host.equals(host)) + val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)).sortBy(_ => random.nextInt) + val peersOnHost = peers.filter(p => p.host.equals(host)).sortBy(_ => random.nextInt) val ret = peersOnOtherHosts ++ peersOnHost logInfo(s"Prioritized peers : ${ret.mkString(", ")}") ret From c6954e07a94542effe8360fd240a02cbce36a9c8 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 9 May 2016 16:36:53 -0400 Subject: [PATCH 04/25] Several changes: 1. Adding rack attribute to hashcode and equals to block manager id. 2. Removing boolean check for rack awareness. Asking master for rack info, and master uses topology mapper. 3. Adding a topology mapper trait and a default implementation that block manager master endpoint uses to discern topology information. --- .../apache/spark/storage/BlockManager.scala | 30 ++++++------- .../apache/spark/storage/BlockManagerId.scala | 9 ++-- .../storage/BlockManagerMasterEndpoint.scala | 16 ++++++- .../storage/RackAwarePrioritization.scala | 10 ++--- .../apache/spark/storage/TopologyMapper.scala | 42 +++++++++++++++++++ 5 files changed, 84 insertions(+), 23 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5364e63312bc..f9c8c80ca9b9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -148,8 +148,6 @@ private[spark] class BlockManager( private val peerFetchLock = new Object private var lastPeerFetchTime = 0L - private val rackAwareReplication = conf.getBoolean("spark.replication.rackawareness", false) - private var rackAwarePrioritizer: RackAwarePriotization = _ /** @@ -165,21 +163,22 @@ private[spark] class BlockManager( blockTransferService.init(this) shuffleClient.init(appId) - val rackInfo = if (rackAwareReplication) { - logInfo("Rack aware block replication enabled. Getting topology information") + val rackInfo = { val rackStr = master.getRackInfo(blockTransferService.hostName) - logInfo(s"Obtained $rackStr") - if (rackStr.isEmpty) None else Some(rackStr) - } else { - None + if (rackStr.isEmpty) { + None + } else { + Some(rackStr) + } } - rackAwarePrioritizer = if (rackAwareReplication) { - val priorityClass = conf.get("spark.replication.rackawareness.prioritizer", "") - assert(!priorityClass.isEmpty, "Need to specify a class for prioritizing " + - "peers when using rack aware block replication") - Utils.classForName(priorityClass).asInstanceOf[RackAwarePriotization] + val priorityClass = conf.get("spark.replication.rackawareness.prioritizer", "") + rackAwarePrioritizer = if (!priorityClass.isEmpty) { + val ret = Utils.classForName(priorityClass).asInstanceOf[RackAwarePriotization] + logInfo(s"Using $priorityClass for prioritizing peers") + ret } else { + logInfo("Using DefaultRackAwarePrioritization for prioritizing peers") new DefaultRackAwarePrioritization(blockTransferService.hostName) } @@ -199,6 +198,8 @@ private[spark] class BlockManager( if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } + + logInfo(s"Initialized BlockManager: $blockManagerId") } private def registerWithExternalShuffleServer() { @@ -1200,9 +1201,10 @@ private[spark] class BlockManager( // we have a failed replication, so we get the list of peers again // we don't want peers we have already replicated to and the ones that // have failed previously - val updatedPeers = rackAwarePrioritizer.prioritize(getPeers(true), blockId).filter{p => + val filteredPeers = getPeers(true).filter{p => !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } + val updatedPeers = rackAwarePrioritizer.prioritize(filteredPeers, blockId) (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 142622be035b..ee035cec649e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -61,6 +61,8 @@ class BlockManagerId private ( def port: Int = port_ + def rack: Option[String] = rackInfo_ + def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER || executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER @@ -92,13 +94,14 @@ class BlockManagerId private ( @throws(classOf[IOException]) private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this) - override def toString: String = s"BlockManagerId($executorId, $host, $port)" + override def toString: String = s"BlockManagerId($executorId, $host, $port, $rack)" - override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + override def hashCode: Int = + ((executorId.hashCode * 41 + host.hashCode) * 41 + port) * 41 + rack.hashCode override def equals(that: Any): Boolean = that match { case id: BlockManagerId => - executorId == id.executorId && port == id.port && host == id.host + executorId == id.executorId && port == id.port && host == id.host && rack == id.rack case _ => false } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index bd48c3f46bb0..9ba27a357344 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -55,6 +55,20 @@ class BlockManagerMasterEndpoint( private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) + private val topologyMapperClassName = + conf.get("spark.replication.rackawareness.topologyMapper", "") + + private val topologyMapper = if (!topologyMapperClassName.isEmpty) { + val ret = Utils.classForName(topologyMapperClassName).asInstanceOf[TopologyMapper] + logInfo(s"Using $topologyMapperClassName for mapping host names to topology") + ret + } else { + logInfo(s"Using DefaultTopologyMapper to map host names to topology") + new DefaultTopologyMapper + } + + logInfo("BlockManagerMasterEndpoint up") + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => register(blockManagerId, maxMemSize, slaveEndpoint) @@ -130,7 +144,7 @@ class BlockManagerMasterEndpoint( } private def getRackInfoForHost(host: String): String = { - s"$host-1" + topologyMapper.getRackForHost(host) } private def removeRdd(rddId: Int): Future[Seq[Int]] = { diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala index 7dcf48421fb8..99c351dd863e 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -48,11 +48,11 @@ class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization override def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) - logInfo(s"Input peers : ${peers.mkString(", ")}") - val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)).sortBy(_ => random.nextInt) - val peersOnHost = peers.filter(p => p.host.equals(host)).sortBy(_ => random.nextInt) - val ret = peersOnOtherHosts ++ peersOnHost - logInfo(s"Prioritized peers : ${ret.mkString(", ")}") + logDebug(s"Input peers : ${peers.mkString(", ")}") + val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) + val peersOnHost = peers.filter(p => p.host.equals(host)) + val ret = random.shuffle(peersOnOtherHosts) ++ random.shuffle(peersOnHost) + logDebug(s"Prioritized peers : ${ret.mkString(", ")}") ret } } diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala new file mode 100644 index 000000000000..f579cced59e8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.internal.Logging + +trait TopologyMapper { + /** + * Gets the rack information given the host name + * + * @param hostname Hostname + * @return rack information for the given hostname. One can use a 'topology delimiter' + * to make this rack information nested. + * For example : ‘/myrack/myhost’, where ‘/’ is the topology delimiter, + * ‘myrack’ is the rack identifier, and ‘myhost’ is the individual host. + * Note that this would only help if the RackAwarePrioritization implementation + * being provided is smart enough to make use of all this information. + */ + def getRackForHost(hostname: String): String +} + +class DefaultTopologyMapper extends TopologyMapper with Logging { + override def getRackForHost(hostname: String): String = { + logDebug(s"Got a request for $hostname") + "DefaultRack" + } +} From 6863e25df4ebd56fd69bac6f62369c1a3eb128eb Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 9 May 2016 16:58:21 -0400 Subject: [PATCH 05/25] Adding null check so a Block Manager can be initiaziled without the master. --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f9c8c80ca9b9..a06beec7c56c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -165,7 +165,7 @@ private[spark] class BlockManager( val rackInfo = { val rackStr = master.getRackInfo(blockTransferService.hostName) - if (rackStr.isEmpty) { + if (rackStr == null || rackStr.isEmpty) { None } else { Some(rackStr) From f634f0e314bc773e76b7f3c788d123a0a7597ad9 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 12 May 2016 17:09:16 -0400 Subject: [PATCH 06/25] Renaming classes/variables from rack to a more general topology. --- .../apache/spark/storage/BlockManager.scala | 6 ++-- .../apache/spark/storage/BlockManagerId.scala | 30 +++++++++++-------- .../spark/storage/BlockManagerMaster.scala | 2 +- .../storage/BlockManagerMasterEndpoint.scala | 4 +-- .../spark/storage/BlockManagerMessages.scala | 2 +- ...a => BlockReplicationPrioritization.scala} | 7 +++-- 6 files changed, 30 insertions(+), 21 deletions(-) rename core/src/main/scala/org/apache/spark/storage/{RackAwarePrioritization.scala => BlockReplicationPrioritization.scala} (93%) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a06beec7c56c..152b2be47b81 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -148,7 +148,7 @@ private[spark] class BlockManager( private val peerFetchLock = new Object private var lastPeerFetchTime = 0L - private var rackAwarePrioritizer: RackAwarePriotization = _ + private var rackAwarePrioritizer: BlockReplicationPriotization = _ /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as @@ -174,12 +174,12 @@ private[spark] class BlockManager( val priorityClass = conf.get("spark.replication.rackawareness.prioritizer", "") rackAwarePrioritizer = if (!priorityClass.isEmpty) { - val ret = Utils.classForName(priorityClass).asInstanceOf[RackAwarePriotization] + val ret = Utils.classForName(priorityClass).asInstanceOf[BlockReplicationPriotization] logInfo(s"Using $priorityClass for prioritizing peers") ret } else { logInfo("Using DefaultRackAwarePrioritization for prioritizing peers") - new DefaultRackAwarePrioritization(blockTransferService.hostName) + new DefaultBlockReplicationPrioritization(blockTransferService.hostName) } blockManagerId = BlockManagerId( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index ee035cec649e..c825154c1431 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -38,7 +38,7 @@ class BlockManagerId private ( private var executorId_ : String, private var host_ : String, private var port_ : Int, - private var rackInfo_ : Option[String]) + private var topologyInfo_ : Option[String]) extends Externalizable { private def this() = this(null, null, 0, None) // For deserialization only @@ -61,7 +61,7 @@ class BlockManagerId private ( def port: Int = port_ - def rack: Option[String] = rackInfo_ + def topologyInfo: Option[String] = topologyInfo_ def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER || @@ -72,17 +72,17 @@ class BlockManagerId private ( out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) - out.writeBoolean(rackInfo_.isDefined) - // if we don't keep rack information, we just write an empty string. - out.writeUTF(rackInfo_.getOrElse("")) + out.writeBoolean(topologyInfo_.isDefined) + // if we don't keep topology information, we just write an empty string. + out.writeUTF(topologyInfo_.getOrElse("")) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { executorId_ = in.readUTF() host_ = in.readUTF() port_ = in.readInt() - val isRackInfoAvailable = in.readBoolean() - rackInfo_ = if (isRackInfoAvailable) { + val isTopologyInfoAvailable = in.readBoolean() + topologyInfo_ = if (isTopologyInfoAvailable) { Some(in.readUTF()) } else { // we would read an empty string in this case @@ -94,14 +94,17 @@ class BlockManagerId private ( @throws(classOf[IOException]) private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this) - override def toString: String = s"BlockManagerId($executorId, $host, $port, $rack)" + override def toString: String = s"BlockManagerId($executorId, $host, $port, $topologyInfo)" override def hashCode: Int = - ((executorId.hashCode * 41 + host.hashCode) * 41 + port) * 41 + rack.hashCode + ((executorId.hashCode * 41 + host.hashCode) * 41 + port) * 41 + topologyInfo.hashCode override def equals(that: Any): Boolean = that match { case id: BlockManagerId => - executorId == id.executorId && port == id.port && host == id.host && rack == id.rack + executorId == id.executorId && + port == id.port && + host == id.host && + topologyInfo == id.topologyInfo case _ => false } @@ -118,8 +121,11 @@ private[spark] object BlockManagerId { * @param port Port of the block manager. * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ - def apply(execId: String, host: String, port: Int, rack: Option[String] = None): BlockManagerId = - getCachedBlockManagerId(new BlockManagerId(execId, host, port, rack)) + def apply(execId: String, + host: String, + port: Int, + topologyInfo: Option[String] = None): BlockManagerId = + getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo)) def apply(in: ObjectInput): BlockManagerId = { val obj = new BlockManagerId() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e67307c98526..4f9af25fd6e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -228,7 +228,7 @@ class BlockManagerMaster( } def getRackInfo(host: String): String = { - driverEndpoint.askWithRetry[String](GetRackInfo(host)) + driverEndpoint.askWithRetry[String](GetTopologyInfo(host)) } /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 9ba27a357344..fb5045e795c7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -139,11 +139,11 @@ class BlockManagerMasterEndpoint( case None => context.reply(false) } - case GetRackInfo(host) => context.reply(getRackInfoForHost(host)) + case GetTopologyInfo(host) => context.reply(getTopologyInfoForHost(host)) } - private def getRackInfoForHost(host: String): String = { + private def getTopologyInfoForHost(host: String): String = { topologyMapper.getRackForHost(host) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 1c0f34e1ed15..d724755eeb81 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -112,5 +112,5 @@ private[spark] object BlockManagerMessages { case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster - case class GetRackInfo(host: String) extends ToBlockManagerMaster + case class GetTopologyInfo(host: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala similarity index 93% rename from core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala rename to core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 99c351dd863e..8680568f6976 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -22,10 +22,11 @@ import scala.util.Random import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ -trait RackAwarePriotization { +trait BlockReplicationPriotization { /** * Method to prioritize a bunch of candidate peers of a block + * * @param peers A list of peers of a BlockManager * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. @@ -34,7 +35,9 @@ trait RackAwarePriotization { def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] } -class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { +class DefaultBlockReplicationPrioritization(host: String) + extends BlockReplicationPriotization + with Logging { /** * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, From 9275feb12dc304ca906df5323a36ffc5e74ef62d Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 12 May 2016 17:15:46 -0400 Subject: [PATCH 07/25] Renaming classes/variables from rack to a more general topology. --- .../apache/spark/storage/BlockManager.scala | 22 +++++++++---------- .../spark/storage/BlockManagerMaster.scala | 2 +- .../storage/BlockManagerMasterEndpoint.scala | 4 ++-- .../apache/spark/storage/TopologyMapper.scala | 6 ++--- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 152b2be47b81..4d34d8fbad53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -148,7 +148,7 @@ private[spark] class BlockManager( private val peerFetchLock = new Object private var lastPeerFetchTime = 0L - private var rackAwarePrioritizer: BlockReplicationPriotization = _ + private var blockReplicationPrioritizer: BlockReplicationPriotization = _ /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as @@ -163,27 +163,27 @@ private[spark] class BlockManager( blockTransferService.init(this) shuffleClient.init(appId) - val rackInfo = { - val rackStr = master.getRackInfo(blockTransferService.hostName) - if (rackStr == null || rackStr.isEmpty) { + val topologyInfo = { + val topologyStr = master.getTopologyInfo(blockTransferService.hostName) + if (topologyStr == null || topologyStr.isEmpty) { None } else { - Some(rackStr) + Some(topologyStr) } } - val priorityClass = conf.get("spark.replication.rackawareness.prioritizer", "") - rackAwarePrioritizer = if (!priorityClass.isEmpty) { + val priorityClass = conf.get("spark.replication.topologyawareness.prioritizer", "") + blockReplicationPrioritizer = if (!priorityClass.isEmpty) { val ret = Utils.classForName(priorityClass).asInstanceOf[BlockReplicationPriotization] logInfo(s"Using $priorityClass for prioritizing peers") ret } else { - logInfo("Using DefaultRackAwarePrioritization for prioritizing peers") + logInfo("Using DefaultBlockReplicationPrioritization for prioritizing peers") new DefaultBlockReplicationPrioritization(blockTransferService.hostName) } blockManagerId = BlockManagerId( - executorId, blockTransferService.hostName, blockTransferService.port, rackInfo) + executorId, blockTransferService.hostName, blockTransferService.port, topologyInfo) shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") @@ -1204,7 +1204,7 @@ private[spark] class BlockManager( val filteredPeers = getPeers(true).filter{p => !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } - val updatedPeers = rackAwarePrioritizer.prioritize(filteredPeers, blockId) + val updatedPeers = blockReplicationPrioritizer.prioritize(filteredPeers, blockId) (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) } @@ -1214,7 +1214,7 @@ private[spark] class BlockManager( val startTime = System.currentTimeMillis val peersReplicatedTo = replicateBlock(0, - rackAwarePrioritizer.prioritize(getPeers(false), blockId), + blockReplicationPrioritizer.prioritize(getPeers(false), blockId), Set.empty, Set.empty) logDebug(s"Replicating $blockId of ${data.size} bytes to " + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 4f9af25fd6e0..400050927e93 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -227,7 +227,7 @@ class BlockManagerMaster( } } - def getRackInfo(host: String): String = { + def getTopologyInfo(host: String): String = { driverEndpoint.askWithRetry[String](GetTopologyInfo(host)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index fb5045e795c7..0478a84dab6a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint( private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) private val topologyMapperClassName = - conf.get("spark.replication.rackawareness.topologyMapper", "") + conf.get("spark.replication.topologyawareness.topologyMapper", "") private val topologyMapper = if (!topologyMapperClassName.isEmpty) { val ret = Utils.classForName(topologyMapperClassName).asInstanceOf[TopologyMapper] @@ -144,7 +144,7 @@ class BlockManagerMasterEndpoint( } private def getTopologyInfoForHost(host: String): String = { - topologyMapper.getRackForHost(host) + topologyMapper.getTopologyForHost(host) } private def removeRdd(rddId: Int): Future[Seq[Int]] = { diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index f579cced59e8..77f1cf3f4422 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -28,14 +28,12 @@ trait TopologyMapper { * to make this rack information nested. * For example : ‘/myrack/myhost’, where ‘/’ is the topology delimiter, * ‘myrack’ is the rack identifier, and ‘myhost’ is the individual host. - * Note that this would only help if the RackAwarePrioritization implementation - * being provided is smart enough to make use of all this information. */ - def getRackForHost(hostname: String): String + def getTopologyForHost(hostname: String): String } class DefaultTopologyMapper extends TopologyMapper with Logging { - override def getRackForHost(hostname: String): String = { + override def getTopologyForHost(hostname: String): String = { logDebug(s"Got a request for $hostname") "DefaultRack" } From 5d4178b4f56c3aff65acc6a5bfb8585bf2517044 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 12 May 2016 17:27:29 -0400 Subject: [PATCH 08/25] We continue to randomly choose peers, so there is no change in current behavior. --- .../apache/spark/storage/BlockReplicationPrioritization.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 8680568f6976..c2d9d37ace22 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -52,9 +52,7 @@ class DefaultBlockReplicationPrioritization(host: String) val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") - val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) - val peersOnHost = peers.filter(p => p.host.equals(host)) - val ret = random.shuffle(peersOnOtherHosts) ++ random.shuffle(peersOnHost) + val ret = random.shuffle(peers) logDebug(s"Prioritized peers : ${ret.mkString(", ")}") ret } From 2e3c19522224ff6679672f9e0b38e831b5090d72 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 13 May 2016 11:36:17 -0400 Subject: [PATCH 09/25] Spelling correction and minor changes in comments to use a more general topology instead of rack. --- .../scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../spark/storage/BlockReplicationPrioritization.scala | 4 ++-- .../scala/org/apache/spark/storage/TopologyMapper.scala | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4d34d8fbad53..30f1fa625d5d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -148,7 +148,7 @@ private[spark] class BlockManager( private val peerFetchLock = new Object private var lastPeerFetchTime = 0L - private var blockReplicationPrioritizer: BlockReplicationPriotization = _ + private var blockReplicationPrioritizer: BlockReplicationPrioritization = _ /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as @@ -174,7 +174,7 @@ private[spark] class BlockManager( val priorityClass = conf.get("spark.replication.topologyawareness.prioritizer", "") blockReplicationPrioritizer = if (!priorityClass.isEmpty) { - val ret = Utils.classForName(priorityClass).asInstanceOf[BlockReplicationPriotization] + val ret = Utils.classForName(priorityClass).asInstanceOf[BlockReplicationPrioritization] logInfo(s"Using $priorityClass for prioritizing peers") ret } else { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index c2d9d37ace22..1464658fcebb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ -trait BlockReplicationPriotization { +trait BlockReplicationPrioritization { /** * Method to prioritize a bunch of candidate peers of a block @@ -36,7 +36,7 @@ trait BlockReplicationPriotization { } class DefaultBlockReplicationPrioritization(host: String) - extends BlockReplicationPriotization + extends BlockReplicationPrioritization with Logging { /** diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index 77f1cf3f4422..d15641c515c0 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -21,13 +21,13 @@ import org.apache.spark.internal.Logging trait TopologyMapper { /** - * Gets the rack information given the host name + * Gets the topology information given the host name * * @param hostname Hostname - * @return rack information for the given hostname. One can use a 'topology delimiter' - * to make this rack information nested. + * @return topology information for the given hostname. One can use a 'topology delimiter' + * to make this topology information nested. * For example : ‘/myrack/myhost’, where ‘/’ is the topology delimiter, - * ‘myrack’ is the rack identifier, and ‘myhost’ is the individual host. + * ‘myrack’ is the topology identifier, and ‘myhost’ is the individual host. */ def getTopologyForHost(hostname: String): String } From d481b693c11912ca06847de51e1c373e3d704c18 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 13 May 2016 16:32:13 -0400 Subject: [PATCH 10/25] Minor change. Changing replication info message to debug level. --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 30f1fa625d5d..9162355422d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1224,7 +1224,7 @@ private[spark] class BlockManager( s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") } - logInfo(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") + logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") } /** From 762b8d41efae95ad97d38f5f8179341ad0315873 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 16 May 2016 17:47:33 -0400 Subject: [PATCH 11/25] Providing peersReplicateTo to the prioritizer. --- .../org/apache/spark/storage/BlockManager.scala | 5 +++-- .../storage/BlockReplicationPrioritization.scala | 12 +++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9162355422d0..81f773c01f9d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1204,7 +1204,8 @@ private[spark] class BlockManager( val filteredPeers = getPeers(true).filter{p => !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } - val updatedPeers = blockReplicationPrioritizer.prioritize(filteredPeers, blockId) + val updatedPeers = + blockReplicationPrioritizer.prioritize(filteredPeers, peersReplicatedTo, blockId) (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) } @@ -1214,7 +1215,7 @@ private[spark] class BlockManager( val startTime = System.currentTimeMillis val peersReplicatedTo = replicateBlock(0, - blockReplicationPrioritizer.prioritize(getPeers(false), blockId), + blockReplicationPrioritizer.prioritize(getPeers(false), Set.empty, blockId), Set.empty, Set.empty) logDebug(s"Replicating $blockId of ${data.size} bytes to " + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 1464658fcebb..c348e039b98f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -26,13 +26,16 @@ trait BlockReplicationPrioritization { /** * Method to prioritize a bunch of candidate peers of a block - * + * * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] + def prioritize(peers: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] } class DefaultBlockReplicationPrioritization(host: String) @@ -44,11 +47,14 @@ class DefaultBlockReplicationPrioritization(host: String) * that just makes sure we put blocks on different hosts, if possible * * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { + override def prioritize(peers: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") From 6df3cebfa1c06091f69a1f8706ecae59023fb397 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 17 May 2016 15:25:34 -0400 Subject: [PATCH 12/25] Adding developer api annotations to TopologyMapper and BlockReplicationPrioritization --- .../storage/BlockReplicationPrioritization.scala | 11 +++++++++-- .../org/apache/spark/storage/TopologyMapper.scala | 7 +++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index c348e039b98f..f0636610db45 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -17,11 +17,17 @@ package org.apache.spark.storage -import scala.util.Random +import org.apache.spark.annotation.DeveloperApi +import scala.util.Random import org.apache.spark.internal.Logging -/* Trait that should be implemented by any class implementing rack aware prioritization */ +/** + * ::DeveloperApi:: + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for + * replicating blocks + */ +@DeveloperApi trait BlockReplicationPrioritization { /** @@ -38,6 +44,7 @@ trait BlockReplicationPrioritization { blockId: BlockId): Seq[BlockManagerId] } +@DeveloperApi class DefaultBlockReplicationPrioritization(host: String) extends BlockReplicationPrioritization with Logging { diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index d15641c515c0..1db072b4bbc5 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -17,8 +17,14 @@ package org.apache.spark.storage +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +/** + * ::DeveloperApi:: + * TopologyMapper provides topology information for a given host + */ +@DeveloperApi trait TopologyMapper { /** * Gets the topology information given the host name @@ -32,6 +38,7 @@ trait TopologyMapper { def getTopologyForHost(hostname: String): String } +@DeveloperApi class DefaultTopologyMapper extends TopologyMapper with Logging { override def getTopologyForHost(hostname: String): String = { logDebug(s"Got a request for $hostname") From a6dbf3f4b5ebb3b67df6f274bba6b9817b91cd1a Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Wed, 18 May 2016 16:52:22 -0400 Subject: [PATCH 13/25] Changes recommended by @HyukjinKwon to fix style issues. --- .../org/apache/spark/storage/BlockManager.scala | 9 +++++---- .../org/apache/spark/storage/BlockManagerId.scala | 6 +++--- .../spark/storage/BlockManagerMasterEndpoint.scala | 1 - .../storage/BlockReplicationPrioritization.scala | 12 ++++++------ 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 81f773c01f9d..9a2bfa6f975c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1166,8 +1166,8 @@ private[spark] class BlockManager( peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = { if (numFailures > maxReplicationFailures - || peersForReplication.isEmpty - || peersReplicatedTo.size == numPeersToReplicateTo) { + || peersForReplication.isEmpty + || peersReplicatedTo.size == numPeersToReplicateTo) { // This selection of a peer and replication is continued in a loop until one of the // following 3 conditions is fulfilled: // (i) specified number of peers have been replicated to @@ -1201,7 +1201,7 @@ private[spark] class BlockManager( // we have a failed replication, so we get the list of peers again // we don't want peers we have already replicated to and the ones that // have failed previously - val filteredPeers = getPeers(true).filter{p => + val filteredPeers = getPeers(true).filter { p => !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } val updatedPeers = @@ -1214,7 +1214,8 @@ private[spark] class BlockManager( } val startTime = System.currentTimeMillis - val peersReplicatedTo = replicateBlock(0, + val peersReplicatedTo = replicateBlock( + 0, blockReplicationPrioritizer.prioritize(getPeers(false), Set.empty, blockId), Set.empty, Set.empty) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index c825154c1431..9b585b85c5e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -122,9 +122,9 @@ private[spark] object BlockManagerId { * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ def apply(execId: String, - host: String, - port: Int, - topologyInfo: Option[String] = None): BlockManagerId = + host: String, + port: Int, + topologyInfo: Option[String] = None): BlockManagerId = getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo)) def apply(in: ObjectInput): BlockManagerId = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 0478a84dab6a..596631207c31 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -140,7 +140,6 @@ class BlockManagerMasterEndpoint( } case GetTopologyInfo(host) => context.reply(getTopologyInfoForHost(host)) - } private def getTopologyInfoForHost(host: String): String = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index f0636610db45..8c7903a0579d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -17,9 +17,9 @@ package org.apache.spark.storage -import org.apache.spark.annotation.DeveloperApi - import scala.util.Random + +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging /** @@ -40,8 +40,8 @@ trait BlockReplicationPrioritization { * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ def prioritize(peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] } @DeveloperApi @@ -60,8 +60,8 @@ class DefaultBlockReplicationPrioritization(host: String) * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ override def prioritize(peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] = { + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") From 487bfae11f9ea96aa20d847af9785a1b63f951a6 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 20 May 2016 14:43:07 -0400 Subject: [PATCH 14/25] Updating prioritizer api to use current blockmanager id for self identification. --- .../apache/spark/storage/BlockManager.scala | 21 +++++++++++-------- .../BlockReplicationPrioritization.scala | 10 ++++++--- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9a2bfa6f975c..7d4df49c1b45 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -172,14 +172,14 @@ private[spark] class BlockManager( } } - val priorityClass = conf.get("spark.replication.topologyawareness.prioritizer", "") - blockReplicationPrioritizer = if (!priorityClass.isEmpty) { - val ret = Utils.classForName(priorityClass).asInstanceOf[BlockReplicationPrioritization] + blockReplicationPrioritizer = { + val priorityClass = conf.get( + "spark.replication.topologyawareness.prioritizer", + "org.apache.spark.storage.DefaultBlockReplicationPrioritization") + val clazz = Utils.classForName(priorityClass) + val ret = clazz.newInstance.asInstanceOf[BlockReplicationPrioritization] logInfo(s"Using $priorityClass for prioritizing peers") ret - } else { - logInfo("Using DefaultBlockReplicationPrioritization for prioritizing peers") - new DefaultBlockReplicationPrioritization(blockTransferService.hostName) } blockManagerId = BlockManagerId( @@ -1204,8 +1204,11 @@ private[spark] class BlockManager( val filteredPeers = getPeers(true).filter { p => !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) } - val updatedPeers = - blockReplicationPrioritizer.prioritize(filteredPeers, peersReplicatedTo, blockId) + val updatedPeers = blockReplicationPrioritizer.prioritize( + blockManagerId, + filteredPeers, + peersReplicatedTo, + blockId) (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) } @@ -1216,7 +1219,7 @@ private[spark] class BlockManager( val startTime = System.currentTimeMillis val peersReplicatedTo = replicateBlock( 0, - blockReplicationPrioritizer.prioritize(getPeers(false), Set.empty, blockId), + blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId), Set.empty, Set.empty) logDebug(s"Replicating $blockId of ${data.size} bytes to " + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 8c7903a0579d..457f7813822f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -33,19 +33,21 @@ trait BlockReplicationPrioritization { /** * Method to prioritize a bunch of candidate peers of a block * + * @param blockManagerId Id of the current BlockManager for self identification * @param peers A list of peers of a BlockManager * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(peers: Seq[BlockManagerId], + def prioritize(blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] } @DeveloperApi -class DefaultBlockReplicationPrioritization(host: String) +class DefaultBlockReplicationPrioritization extends BlockReplicationPrioritization with Logging { @@ -53,13 +55,15 @@ class DefaultBlockReplicationPrioritization(host: String) * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, * that just makes sure we put blocks on different hosts, if possible * + * @param blockManagerId Id of the current BlockManager for self identification * @param peers A list of peers of a BlockManager * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(peers: Seq[BlockManagerId], + override def prioritize(blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) From 2ef5199e7032575c83d6cca8930d63f3a443a9c9 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 29 Jul 2016 17:45:19 -0400 Subject: [PATCH 15/25] BlockManagerInitialization now only uses a single message to master to get a fully fleshed out id, with topology information, if available. --- .../apache/spark/storage/BlockManager.scala | 22 +++++------- .../spark/storage/BlockManagerMaster.scala | 12 ++++--- .../storage/BlockManagerMasterEndpoint.scala | 36 +++++++++++-------- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 7d4df49c1b45..365d80007871 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -163,15 +163,6 @@ private[spark] class BlockManager( blockTransferService.init(this) shuffleClient.init(appId) - val topologyInfo = { - val topologyStr = master.getTopologyInfo(blockTransferService.hostName) - if (topologyStr == null || topologyStr.isEmpty) { - None - } else { - Some(topologyStr) - } - } - blockReplicationPrioritizer = { val priorityClass = conf.get( "spark.replication.topologyawareness.prioritizer", @@ -182,8 +173,15 @@ private[spark] class BlockManager( ret } - blockManagerId = BlockManagerId( - executorId, blockTransferService.hostName, blockTransferService.port, topologyInfo) + val id = + BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) + + val idFromMaster = master.registerBlockManager( + id, + maxMemory, + slaveEndpoint) + + blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") @@ -192,8 +190,6 @@ private[spark] class BlockManager( blockManagerId } - master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) - // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 400050927e93..fb2eee81e49a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -52,10 +52,14 @@ class BlockManagerMaster( /** Register the BlockManager's id with the driver. */ def registerBlockManager( - blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { - logInfo(s"Registering BlockManager $blockManagerId") - tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) - logInfo(s"Registered BlockManager $blockManagerId") + blockManagerId: BlockManagerId, + maxMemSize: Long, + slaveEndpoint: RpcEndpointRef): BlockManagerId = { + logInfo(s"Trying to register BlockManager $blockManagerId") + val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( + RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) + logInfo(s"Registered BlockManager $updatedId") + updatedId } def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 596631207c31..0f70189aee6f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -55,24 +55,21 @@ class BlockManagerMasterEndpoint( private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) - private val topologyMapperClassName = - conf.get("spark.replication.topologyawareness.topologyMapper", "") - - private val topologyMapper = if (!topologyMapperClassName.isEmpty) { - val ret = Utils.classForName(topologyMapperClassName).asInstanceOf[TopologyMapper] - logInfo(s"Using $topologyMapperClassName for mapping host names to topology") - ret - } else { - logInfo(s"Using DefaultTopologyMapper to map host names to topology") - new DefaultTopologyMapper + private val topologyMapper = { + val topologyMapperClassName = conf.get( + "spark.replication.topologyawareness.topologyMapper", + "org.apache.spark.storage.DefaultTopologyMapper") + val clazz = Utils.classForName(topologyMapperClassName) + val mapper = clazz.newInstance.asInstanceOf[TopologyMapper] + logInfo(s"Using $topologyMapperClassName for getting topology information") + mapper } logInfo("BlockManagerMasterEndpoint up") override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => - register(blockManagerId, maxMemSize, slaveEndpoint) - context.reply(true) + context.reply(register(blockManagerId, maxMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -138,8 +135,6 @@ class BlockManagerMasterEndpoint( } case None => context.reply(false) } - - case GetTopologyInfo(host) => context.reply(getTopologyInfoForHost(host)) } private def getTopologyInfoForHost(host: String): String = { @@ -318,7 +313,17 @@ class BlockManagerMasterEndpoint( ).map(_.flatten.toSeq) } - private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) { + private def register(dummyId: BlockManagerId, + maxMemSize: Long, + slaveEndpoint: RpcEndpointRef): BlockManagerId = { + // the dummy id is not expected to contain the topology information. + // we get that info here and respond back with a more fleshed out block manager id + val id = BlockManagerId( + dummyId.executorId, + dummyId.host, + dummyId.port, + Some(getTopologyInfoForHost(dummyId.host))) + val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { @@ -338,6 +343,7 @@ class BlockManagerMasterEndpoint( id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) + id } private def updateBlockInfo( From 00c6d0cf0daf3ebd3fe195df7d27ef1162377ec3 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 4 Aug 2016 15:52:54 -0400 Subject: [PATCH 16/25] 1. Changing tail recursive function in BlockManager to an imperative while loop, as suggested by @ericl 2. Adding SparkConf constructor arguments to TopologyMapper, so any required properties like classname or file/script names can be passed to a custom topology mapper. --- .../apache/spark/storage/BlockManager.scala | 100 ++++++++---------- .../storage/BlockManagerMasterEndpoint.scala | 3 +- .../apache/spark/storage/TopologyMapper.scala | 9 +- 3 files changed, 52 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 365d80007871..3ba2f078976b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1155,69 +1155,55 @@ private[spark] class BlockManager( val numPeersToReplicateTo = level.replication - 1 - @tailrec def replicateBlock( - numFailures: Int, - peersForReplication: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = { + val startTime = System.currentTimeMillis + + var peersForReplication = + blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId) + var peersReplicatedTo = Set.empty[BlockManagerId] + var peersFailedToReplicateTo = Set.empty[BlockManagerId] + var numFailures = 0 - if (numFailures > maxReplicationFailures + while(!(numFailures > maxReplicationFailures || peersForReplication.isEmpty - || peersReplicatedTo.size == numPeersToReplicateTo) { - // This selection of a peer and replication is continued in a loop until one of the - // following 3 conditions is fulfilled: - // (i) specified number of peers have been replicated to - // (ii) too many failures in replicating to peers - // (iii) no peer left to replicate to - peersReplicatedTo - } else { - val peer = peersForReplication.head - val (updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) = try { - val onePeerStartTime = System.currentTimeMillis - logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") - blockTransferService.uploadBlockSync( - peer.host, - peer.port, - peer.executorId, - blockId, - new NettyManagedBuffer(data.toNetty), - tLevel, - classTag) - logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + - s" in ${System.currentTimeMillis - onePeerStartTime} ms") - // the block was replicated, lets update state and move ahead - (numFailures, - peersForReplication.tail, - peersReplicatedTo + peer, - peersFailedToReplicateTo) - } catch { - case NonFatal(e) => - logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) - val updatedFailedPeers = peersFailedToReplicateTo + peer - // we have a failed replication, so we get the list of peers again - // we don't want peers we have already replicated to and the ones that - // have failed previously - val filteredPeers = getPeers(true).filter { p => - !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p)) - } - val updatedPeers = blockReplicationPrioritizer.prioritize( - blockManagerId, - filteredPeers, - peersReplicatedTo, - blockId) - (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers) - } + || peersReplicatedTo.size == numPeersToReplicateTo)) { + val peer = peersForReplication.head + try { + val onePeerStartTime = System.currentTimeMillis + logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") + blockTransferService.uploadBlockSync( + peer.host, + peer.port, + peer.executorId, + blockId, + new NettyManagedBuffer(data.toNetty), + tLevel, + classTag) + logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + + s" in ${System.currentTimeMillis - onePeerStartTime} ms") + // the block was replicated, lets update state and move ahead + + peersForReplication = peersForReplication.tail + peersReplicatedTo += peer + } catch { + case NonFatal(e) => + logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) + peersFailedToReplicateTo += peer + // we have a failed replication, so we get the list of peers again + // we don't want peers we have already replicated to and the ones that + // have failed previously + val filteredPeers = getPeers(true).filter { p => + !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p)) + } - replicateBlock(updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) + numFailures += 1 + peersForReplication = blockReplicationPrioritizer.prioritize( + blockManagerId, + filteredPeers, + peersReplicatedTo, + blockId) } } - val startTime = System.currentTimeMillis - val peersReplicatedTo = replicateBlock( - 0, - blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId), - Set.empty, - Set.empty) logDebug(s"Replicating $blockId of ${data.size} bytes to " + s"${peersReplicatedTo.size} peer(s) took ${System.currentTimeMillis - startTime} ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 0f70189aee6f..10631c17095d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -60,7 +60,8 @@ class BlockManagerMasterEndpoint( "spark.replication.topologyawareness.topologyMapper", "org.apache.spark.storage.DefaultTopologyMapper") val clazz = Utils.classForName(topologyMapperClassName) - val mapper = clazz.newInstance.asInstanceOf[TopologyMapper] + val mapper = + clazz.getConstructor(conf.getClass).newInstance(conf).asInstanceOf[TopologyMapper] logInfo(s"Using $topologyMapperClassName for getting topology information") mapper } diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index 1db072b4bbc5..75b9995da8ac 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -17,15 +17,19 @@ package org.apache.spark.storage +import java.util.Properties + +import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging /** * ::DeveloperApi:: * TopologyMapper provides topology information for a given host + * @param conf SparkConf to get required properties, if needed */ @DeveloperApi -trait TopologyMapper { +abstract class TopologyMapper(conf: SparkConf) { /** * Gets the topology information given the host name * @@ -39,9 +43,10 @@ trait TopologyMapper { } @DeveloperApi -class DefaultTopologyMapper extends TopologyMapper with Logging { +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { override def getTopologyForHost(hostname: String): String = { logDebug(s"Got a request for $hostname") "DefaultRack" } } + From 52c673e5a78ae6d3992b81bd7bfe9458109e3f16 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 5 Aug 2016 14:34:37 -0400 Subject: [PATCH 17/25] Fixing style issues. --- .../scala/org/apache/spark/storage/BlockManagerId.scala | 9 ++++----- .../org/apache/spark/storage/BlockManagerMaster.scala | 8 ++++++-- .../org/apache/spark/storage/BlockManagerMessages.scala | 2 -- .../spark/storage/BlockReplicationPrioritization.scala | 6 ++++-- .../scala/org/apache/spark/storage/TopologyMapper.scala | 4 +--- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 9b585b85c5e8..d576bfd03941 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -73,8 +73,8 @@ class BlockManagerId private ( out.writeUTF(host_) out.writeInt(port_) out.writeBoolean(topologyInfo_.isDefined) - // if we don't keep topology information, we just write an empty string. - out.writeUTF(topologyInfo_.getOrElse("")) + // we only write topologyInfo if we have it + topologyInfo.foreach(out.writeUTF(_)) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -85,8 +85,6 @@ class BlockManagerId private ( topologyInfo_ = if (isTopologyInfoAvailable) { Some(in.readUTF()) } else { - // we would read an empty string in this case - in.readUTF() None } } @@ -121,7 +119,8 @@ private[spark] object BlockManagerId { * @param port Port of the block manager. * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ - def apply(execId: String, + def apply( + execId: String, host: String, port: Int, topologyInfo: Option[String] = None): BlockManagerId = diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index fb2eee81e49a..e68e7d6023c1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -50,12 +50,16 @@ class BlockManagerMaster( logInfo("Removal of executor " + execId + " requested") } - /** Register the BlockManager's id with the driver. */ + /** + * Register the BlockManager's id with the driver. The input BlockManagerId does not contain + * topology information. This information is obtained from the master and we respond with an + * updated BlockManagerId fleshed out with this information. + */ def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { - logInfo(s"Trying to register BlockManager $blockManagerId") + logInfo(s"Trying to register BlockManager $blockManagerId") val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index d724755eeb81..6bded9270050 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -111,6 +111,4 @@ private[spark] object BlockManagerMessages { case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster - - case class GetTopologyInfo(host: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 457f7813822f..77911620ad6e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -40,7 +40,8 @@ trait BlockReplicationPrioritization { * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(blockManagerId: BlockManagerId, + def prioritize( + blockManagerId: BlockManagerId, peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] @@ -62,7 +63,8 @@ class DefaultBlockReplicationPrioritization * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(blockManagerId: BlockManagerId, + override def prioritize( + blockManagerId: BlockManagerId, peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index 75b9995da8ac..6799e55c7a9a 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -17,11 +17,9 @@ package org.apache.spark.storage -import java.util.Properties - -import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.SparkConf /** * ::DeveloperApi:: From c6b252b59c046ad4b1cbdb274fbde92f73931953 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 5 Aug 2016 14:38:34 -0400 Subject: [PATCH 18/25] Fixing style issues. --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +++--- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 4 ---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3ba2f078976b..b4cf04715338 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1163,9 +1163,9 @@ private[spark] class BlockManager( var peersFailedToReplicateTo = Set.empty[BlockManagerId] var numFailures = 0 - while(!(numFailures > maxReplicationFailures - || peersForReplication.isEmpty - || peersReplicatedTo.size == numPeersToReplicateTo)) { + while(numFailures <= maxReplicationFailures + && !peersForReplication.isEmpty + && peersReplicatedTo.size != numPeersToReplicateTo) { val peer = peersForReplication.head try { val onePeerStartTime = System.currentTimeMillis diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e68e7d6023c1..d9e662d9b287 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -235,10 +235,6 @@ class BlockManagerMaster( } } - def getTopologyInfo(host: String): String = { - driverEndpoint.askWithRetry[String](GetTopologyInfo(host)) - } - /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ private def tell(message: Any) { if (!driverEndpoint.askWithRetry[Boolean](message)) { From 59e91a74226d88e4285f9085cfff679077fc1533 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 9 Aug 2016 11:24:42 -0400 Subject: [PATCH 19/25] Adding documentation to clarify on topology. --- .../main/scala/org/apache/spark/storage/BlockManagerId.scala | 4 ++++ .../main/scala/org/apache/spark/storage/TopologyMapper.scala | 1 + 2 files changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index d576bfd03941..dde2406fb4c2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -117,6 +117,10 @@ private[spark] object BlockManagerId { * @param execId ID of the executor. * @param host Host name of the block manager. * @param port Port of the block manager. + * @param topologyInfo topology information for the blockmanager, if available + * This can be network topology information for use while choosing peers + * while replicating data blocks. More information available here: + * [[org.apache.spark.storage.TopologyMapper]] * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ def apply( diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index 6799e55c7a9a..d27af45dd38e 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -36,6 +36,7 @@ abstract class TopologyMapper(conf: SparkConf) { * to make this topology information nested. * For example : ‘/myrack/myhost’, where ‘/’ is the topology delimiter, * ‘myrack’ is the topology identifier, and ‘myhost’ is the individual host. + * This function only returns the topology information without the hostname. */ def getTopologyForHost(hostname: String): String } From e695cd25a8603223879223a9c821b4ad6d465ede Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 9 Aug 2016 14:54:49 -0400 Subject: [PATCH 20/25] Adding a simple file based topology-mapper. --- .../apache/spark/storage/TopologyMapper.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index d27af45dd38e..b9c9b5bf7c23 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -17,9 +17,13 @@ package org.apache.spark.storage +import java.io.{File, FileInputStream} +import java.util.Properties + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.SparkConf +import org.apache.spark.util.Utils /** * ::DeveloperApi:: @@ -49,3 +53,30 @@ class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with L } } +/** + * A simple file based topology mapper. This expects topology information provided as a + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property + * `spark.replication.topologyawareness.topologyfile`. To use this topology mapper, set the + * `spark.replication.topologyawareness.topologyMapper` property to + * [[org.apache.spark.storage.FileBasedTopologyMapper]] + * @param conf SparkConf object + */ +@DeveloperApi +class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { + + val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile") + require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.") + val topologyMap = Utils.getPropertiesFromFile(topologyFile.get) + + override def getTopologyForHost(hostname: String): String = { + val topology = topologyMap.get(hostname) + if(topology.isDefined) { + logDebug(s"$hostname -> ${topology.get}") + topology.get + } else { + logWarning(s"$hostname does not have any topology information") + "" + } + } +} + From 5c33cc846b440c12443f467b36fd2fdaf9df0791 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Wed, 10 Aug 2016 17:59:22 -0400 Subject: [PATCH 21/25] Fixing style issues. --- .../scala/org/apache/spark/storage/BlockManager.scala | 8 +++----- .../apache/spark/storage/BlockManagerMasterEndpoint.scala | 4 +--- .../spark/storage/BlockReplicationPrioritization.scala | 4 +++- .../scala/org/apache/spark/storage/TopologyMapper.scala | 5 ++--- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b4cf04715338..b6d95f046540 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1163,9 +1163,9 @@ private[spark] class BlockManager( var peersFailedToReplicateTo = Set.empty[BlockManagerId] var numFailures = 0 - while(numFailures <= maxReplicationFailures - && !peersForReplication.isEmpty - && peersReplicatedTo.size != numPeersToReplicateTo) { + while(numFailures <= maxReplicationFailures && + !peersForReplication.isEmpty && + peersReplicatedTo.size != numPeersToReplicateTo) { val peer = peersForReplication.head try { val onePeerStartTime = System.currentTimeMillis @@ -1180,8 +1180,6 @@ private[spark] class BlockManager( classTag) logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + s" in ${System.currentTimeMillis - onePeerStartTime} ms") - // the block was replicated, lets update state and move ahead - peersForReplication = peersForReplication.tail peersReplicatedTo += peer } catch { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 10631c17095d..4ad8d4e44ffb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -138,9 +138,7 @@ class BlockManagerMasterEndpoint( } } - private def getTopologyInfoForHost(host: String): String = { - topologyMapper.getTopologyForHost(host) - } + private def getTopologyInfoForHost(h: String): String = topologyMapper.getTopologyForHost(h) private def removeRdd(rddId: Int): Future[Seq[Int]] = { // First remove the metadata for the given RDD, and then asynchronously remove the blocks diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 77911620ad6e..358e23a83c9c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -25,7 +25,9 @@ import org.apache.spark.internal.Logging /** * ::DeveloperApi:: * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for - * replicating blocks + * replicating blocks. BlockManager will replicate to each peer returned in order until the + * desired replication order is reached. If a replication fails, prioritize() will be called + * again to get a fresh prioritization. */ @DeveloperApi trait BlockReplicationPrioritization { diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index b9c9b5bf7c23..70b6f74b0bd5 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -56,21 +56,20 @@ class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with L /** * A simple file based topology mapper. This expects topology information provided as a * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property - * `spark.replication.topologyawareness.topologyfile`. To use this topology mapper, set the + * `spark.replication.topologyawareness.topologyFile`. To use this topology mapper, set the * `spark.replication.topologyawareness.topologyMapper` property to * [[org.apache.spark.storage.FileBasedTopologyMapper]] * @param conf SparkConf object */ @DeveloperApi class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { - val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile") require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.") val topologyMap = Utils.getPropertiesFromFile(topologyFile.get) override def getTopologyForHost(hostname: String): String = { val topology = topologyMap.get(hostname) - if(topology.isDefined) { + if (topology.isDefined) { logDebug(s"$hostname -> ${topology.get}") topology.get } else { From 90b4e77574df34ea12d8329f6cfa659f85b826b2 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 12 Aug 2016 15:37:41 -0400 Subject: [PATCH 22/25] Inlining call to get topology. --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 4ad8d4e44ffb..fa8473c97c81 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -138,8 +138,6 @@ class BlockManagerMasterEndpoint( } } - private def getTopologyInfoForHost(h: String): String = topologyMapper.getTopologyForHost(h) - private def removeRdd(rddId: Int): Future[Seq[Int]] = { // First remove the metadata for the given RDD, and then asynchronously remove the blocks // from the slaves. @@ -321,7 +319,7 @@ class BlockManagerMasterEndpoint( dummyId.executorId, dummyId.host, dummyId.port, - Some(getTopologyInfoForHost(dummyId.host))) + Some(topologyMapper.getTopologyForHost(dummyId.host))) val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { From 6782720b93410f8d1d089b14481eb01aded54dfb Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 12 Aug 2016 15:44:31 -0400 Subject: [PATCH 23/25] converting to not a and not b --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b6d95f046540..fb9837979011 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1190,7 +1190,7 @@ private[spark] class BlockManager( // we don't want peers we have already replicated to and the ones that // have failed previously val filteredPeers = getPeers(true).filter { p => - !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p)) + !peersFailedToReplicateTo.contains(p) && !peersReplicatedTo.contains(p) } numFailures += 1 From d8c62106ad8d9a6303b1a9d69f394b0186cb1ad9 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 23 Aug 2016 12:34:09 -0400 Subject: [PATCH 24/25] 1. Style modifications as suggested by @rxin. 2. Changing the BlockReplicationPolicy api to take the number of peers needed and adding a sampling algo linear in time and space along with test cases. --- .../apache/spark/storage/BlockManager.scala | 58 +++++++++-------- .../apache/spark/storage/BlockManagerId.scala | 14 ++-- .../spark/storage/BlockManagerMaster.scala | 2 +- .../storage/BlockManagerMasterEndpoint.scala | 20 +++--- ...ion.scala => BlockReplicationPolicy.scala} | 64 ++++++++++++++----- .../apache/spark/storage/TopologyMapper.scala | 21 +++--- .../BlockManagerReplicationSuite.scala | 45 ++++++++++++- 7 files changed, 150 insertions(+), 74 deletions(-) rename core/src/main/scala/org/apache/spark/storage/{BlockReplicationPrioritization.scala => BlockReplicationPolicy.scala} (56%) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fb9837979011..7a7fcf38fdb0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,8 +20,8 @@ package org.apache.spark.storage import java.io._ import java.nio.ByteBuffer -import scala.annotation.tailrec -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable +import scala.collection.mutable.HashMap import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -45,6 +45,7 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer + /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( val data: Iterator[Any], @@ -148,7 +149,7 @@ private[spark] class BlockManager( private val peerFetchLock = new Object private var lastPeerFetchTime = 0L - private var blockReplicationPrioritizer: BlockReplicationPrioritization = _ + private var blockReplicationPolicy: BlockReplicationPolicy = _ /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as @@ -163,13 +164,13 @@ private[spark] class BlockManager( blockTransferService.init(this) shuffleClient.init(appId) - blockReplicationPrioritizer = { + blockReplicationPolicy = { val priorityClass = conf.get( "spark.replication.topologyawareness.prioritizer", - "org.apache.spark.storage.DefaultBlockReplicationPrioritization") + "org.apache.spark.storage.RandomBlockReplicationPolicy") val clazz = Utils.classForName(priorityClass) - val ret = clazz.newInstance.asInstanceOf[BlockReplicationPrioritization] - logInfo(s"Using $priorityClass for prioritizing peers") + val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] + logInfo(s"Using $priorityClass for block replication policy") ret } @@ -1133,17 +1134,12 @@ private[spark] class BlockManager( /** * Replicate block to another node. Note that this is a blocking call that returns after * the block has been replicated. - * - * @param blockId - * @param data - * @param level - * @param classTag */ private def replicate( - blockId: BlockId, - data: ChunkedByteBuffer, - level: StorageLevel, - classTag: ClassTag[_]): Unit = { + blockId: BlockId, + data: ChunkedByteBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val tLevel = StorageLevel( @@ -1155,20 +1151,25 @@ private[spark] class BlockManager( val numPeersToReplicateTo = level.replication - 1 - val startTime = System.currentTimeMillis + val startTime = System.nanoTime - var peersForReplication = - blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId) - var peersReplicatedTo = Set.empty[BlockManagerId] - var peersFailedToReplicateTo = Set.empty[BlockManagerId] + var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId] + var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] var numFailures = 0 + var peersForReplication = blockReplicationPolicy.prioritize( + blockManagerId, + getPeers(false), + mutable.HashSet.empty, + blockId, + numPeersToReplicateTo) + while(numFailures <= maxReplicationFailures && - !peersForReplication.isEmpty && - peersReplicatedTo.size != numPeersToReplicateTo) { + !peersForReplication.isEmpty && + peersReplicatedTo.size != numPeersToReplicateTo) { val peer = peersForReplication.head try { - val onePeerStartTime = System.currentTimeMillis + val onePeerStartTime = System.nanoTime logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") blockTransferService.uploadBlockSync( peer.host, @@ -1179,7 +1180,7 @@ private[spark] class BlockManager( tLevel, classTag) logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + - s" in ${System.currentTimeMillis - onePeerStartTime} ms") + s" in ${(System.nanoTime - onePeerStartTime).toDouble / 1e6} ms") peersForReplication = peersForReplication.tail peersReplicatedTo += peer } catch { @@ -1194,16 +1195,17 @@ private[spark] class BlockManager( } numFailures += 1 - peersForReplication = blockReplicationPrioritizer.prioritize( + peersForReplication = blockReplicationPolicy.prioritize( blockManagerId, filteredPeers, peersReplicatedTo, - blockId) + blockId, + numPeersToReplicateTo - peersReplicatedTo.size) } } logDebug(s"Replicating $blockId of ${data.size} bytes to " + - s"${peersReplicatedTo.size} peer(s) took ${System.currentTimeMillis - startTime} ms") + s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index dde2406fb4c2..7d2f443b53d3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -82,11 +82,7 @@ class BlockManagerId private ( host_ = in.readUTF() port_ = in.readInt() val isTopologyInfoAvailable = in.readBoolean() - topologyInfo_ = if (isTopologyInfoAvailable) { - Some(in.readUTF()) - } else { - None - } + topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None } @throws(classOf[IOException]) @@ -124,10 +120,10 @@ private[spark] object BlockManagerId { * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ def apply( - execId: String, - host: String, - port: Int, - topologyInfo: Option[String] = None): BlockManagerId = + execId: String, + host: String, + port: Int, + topologyInfo: Option[String] = None): BlockManagerId = getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo)) def apply(in: ObjectInput): BlockManagerId = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index d9e662d9b287..7a600068912b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -59,7 +59,7 @@ class BlockManagerMaster( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { - logInfo(s"Trying to register BlockManager $blockManagerId") + logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index fa8473c97c81..5d1ae31f846e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -61,7 +61,7 @@ class BlockManagerMasterEndpoint( "org.apache.spark.storage.DefaultTopologyMapper") val clazz = Utils.classForName(topologyMapperClassName) val mapper = - clazz.getConstructor(conf.getClass).newInstance(conf).asInstanceOf[TopologyMapper] + clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper] logInfo(s"Using $topologyMapperClassName for getting topology information") mapper } @@ -310,16 +310,20 @@ class BlockManagerMasterEndpoint( ).map(_.flatten.toSeq) } - private def register(dummyId: BlockManagerId, - maxMemSize: Long, - slaveEndpoint: RpcEndpointRef): BlockManagerId = { + /** + * Returns the BlockManagerId with topology information populated, if available. + */ + private def register( + idWithoutTopologyInfo: BlockManagerId, + maxMemSize: Long, + slaveEndpoint: RpcEndpointRef): BlockManagerId = { // the dummy id is not expected to contain the topology information. // we get that info here and respond back with a more fleshed out block manager id val id = BlockManagerId( - dummyId.executorId, - dummyId.host, - dummyId.port, - Some(topologyMapper.getTopologyForHost(dummyId.host))) + idWithoutTopologyInfo.executorId, + idWithoutTopologyInfo.host, + idWithoutTopologyInfo.port, + topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host)) val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala similarity index 56% rename from core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala rename to core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala index 358e23a83c9c..57e60d27b755 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala @@ -17,6 +17,7 @@ package org.apache.spark.storage +import scala.collection.mutable import scala.util.Random import org.apache.spark.annotation.DeveloperApi @@ -30,7 +31,7 @@ import org.apache.spark.internal.Logging * again to get a fresh prioritization. */ @DeveloperApi -trait BlockReplicationPrioritization { +trait BlockReplicationPolicy { /** * Method to prioritize a bunch of candidate peers of a block @@ -40,18 +41,21 @@ trait BlockReplicationPrioritization { * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. - * @return A prioritized list of peers. Lower the index of a peer, higher its priority + * @param numPeersToReplicateTo Number of peers we need to replicate to + * @return A prioritized list of peers. Lower the index of a peer, higher its priority. + * This returns a list of size at most `numPeersToReplicateTo`. */ def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + blockId: BlockId, + numPeersToReplicateTo: Int): List[BlockManagerId] } @DeveloperApi -class DefaultBlockReplicationPrioritization - extends BlockReplicationPrioritization +class RandomBlockReplicationPolicy + extends BlockReplicationPolicy with Logging { /** @@ -66,15 +70,43 @@ class DefaultBlockReplicationPrioritization * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ override def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] = { + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + blockId: BlockId, + numReplicas: Int): List[BlockManagerId] = { val random = new Random(blockId.hashCode) - logDebug(s"Input peers : ${peers.mkString(", ")}") - val ret = random.shuffle(peers) - logDebug(s"Prioritized peers : ${ret.mkString(", ")}") - ret + val prioritizedPeers = if (peers.size > numReplicas) { + getSampleIds(peers.size, numReplicas, random).map(peers(_)) + } else { + if (peers.size < numReplicas) { + logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.") + } + random.shuffle(peers).toList + } + logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}") + prioritizedPeers + } + + /** + * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while + * minimizing space usage + * [[http://math.stackexchange.com/questions/178690/ + * whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin]] + * + * @param n total number of indices + * @param m number of samples needed + * @param r random number generator + * @return list of m random unique indices + */ + private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = { + val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (s, i) => + val t = r.nextInt(i) + 1 + if (s.contains(t)) s + i else s + t + } + // we shuffle the result to ensure a random arrangement within the sample + // to avoid any bias from set implementations + r.shuffle(indices.map(_ - 1).toList) } } diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index 70b6f74b0bd5..ff66e954f106 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -17,12 +17,9 @@ package org.apache.spark.storage -import java.io.{File, FileInputStream} -import java.util.Properties - +import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging -import org.apache.spark.SparkConf import org.apache.spark.util.Utils /** @@ -41,15 +38,20 @@ abstract class TopologyMapper(conf: SparkConf) { * For example : ‘/myrack/myhost’, where ‘/’ is the topology delimiter, * ‘myrack’ is the topology identifier, and ‘myhost’ is the individual host. * This function only returns the topology information without the hostname. + * This information can be used when choosing executors for block replication + * to discern executors from a different rack than a candidate executor, for example. + * + * An implementation can choose to use empty strings or None in case topology info + * is not available. This would imply that all such executors belong to the same rack. */ - def getTopologyForHost(hostname: String): String + def getTopologyForHost(hostname: String): Option[String] } @DeveloperApi class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { - override def getTopologyForHost(hostname: String): String = { + override def getTopologyForHost(hostname: String): Option[String] = { logDebug(s"Got a request for $hostname") - "DefaultRack" + Some("DefaultRack") } } @@ -67,15 +69,14 @@ class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.") val topologyMap = Utils.getPropertiesFromFile(topologyFile.get) - override def getTopologyForHost(hostname: String): String = { + override def getTopologyForHost(hostname: String): Option[String] = { val topology = topologyMap.get(hostname) if (topology.isDefined) { logDebug(s"$hostname -> ${topology.get}") - topology.get } else { logWarning(s"$hostname does not have any topology information") - "" } + topology } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index b9e3a364ee22..2f175c266733 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -21,11 +21,9 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.postfixOps - import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ - import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.memory.UnifiedMemoryManager @@ -37,6 +35,8 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ +import scala.collection.mutable + /** Testsuite that tests block replication in BlockManager */ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers @@ -346,6 +346,47 @@ class BlockManagerReplicationSuite extends SparkFunSuite } } + /** + * Test if we get the required number of peers when using random sampling from + * RandomBlockReplicationPolicy + */ + test(s"block replication - random block replication policy") { + val numBlockManagers = 10 + val storeSize = 1000 + val blockManagers = (1 to numBlockManagers).map { i => + BlockManagerId(s"store-$i", "localhost", 1000 + i, None) + } + val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None) + val replicationPolicy = new RandomBlockReplicationPolicy + val blockId = "test-block" + + (1 to 10).foreach {numReplicas => + logInfo(s"Num replicas : $numReplicas") + val randomPeers = replicationPolicy.prioritize( + candidateBlockManager, + blockManagers, + mutable.HashSet.empty[BlockManagerId], + blockId, + numReplicas + ) + logInfo(s"Random peers : ${randomPeers.mkString(", ")}") + assert(randomPeers.size === numReplicas) + + // choosing n peers out of n + val secondPass = replicationPolicy.prioritize( + candidateBlockManager, + randomPeers, + mutable.HashSet.empty[BlockManagerId], + blockId, + numReplicas + ) + logInfo(s"Random peers : ${secondPass.mkString(", ")}") + assert(secondPass.size === numReplicas) + } + + } + + /** * Test replication of blocks with different storage levels (various combinations of * memory, disk & serialization). For each storage level, this function tests every store From 632d0436ca701031e03bd141b95d4b0bb5544150 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 29 Aug 2016 14:38:06 -0400 Subject: [PATCH 25/25] Incorporating corrections and suggestions by @rxin --- .../apache/spark/storage/BlockManager.scala | 3 +- .../storage/BlockManagerMasterEndpoint.scala | 3 +- .../storage/BlockReplicationPolicy.scala | 8 +- .../apache/spark/storage/TopologyMapper.scala | 14 ++-- .../BlockManagerReplicationSuite.scala | 43 +---------- .../storage/BlockReplicationPolicySuite.scala | 74 +++++++++++++++++++ .../spark/storage/TopologyMapperSuite.scala | 68 +++++++++++++++++ 7 files changed, 159 insertions(+), 54 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala create mode 100644 core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 7a7fcf38fdb0..982b83324e0f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -166,8 +166,7 @@ private[spark] class BlockManager( blockReplicationPolicy = { val priorityClass = conf.get( - "spark.replication.topologyawareness.prioritizer", - "org.apache.spark.storage.RandomBlockReplicationPolicy") + "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) val clazz = Utils.classForName(priorityClass) val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] logInfo(s"Using $priorityClass for block replication policy") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 5d1ae31f846e..145c434a4f0c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -57,8 +57,7 @@ class BlockManagerMasterEndpoint( private val topologyMapper = { val topologyMapperClassName = conf.get( - "spark.replication.topologyawareness.topologyMapper", - "org.apache.spark.storage.DefaultTopologyMapper") + "spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName) val clazz = Utils.classForName(topologyMapperClassName) val mapper = clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper] diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala index 57e60d27b755..bf087af16a5b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala @@ -41,7 +41,7 @@ trait BlockReplicationPolicy { * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. - * @param numPeersToReplicateTo Number of peers we need to replicate to + * @param numReplicas Number of peers we need to replicate to * @return A prioritized list of peers. Lower the index of a peer, higher its priority. * This returns a list of size at most `numPeersToReplicateTo`. */ @@ -50,7 +50,7 @@ trait BlockReplicationPolicy { peers: Seq[BlockManagerId], peersReplicatedTo: mutable.HashSet[BlockManagerId], blockId: BlockId, - numPeersToReplicateTo: Int): List[BlockManagerId] + numReplicas: Int): List[BlockManagerId] } @DeveloperApi @@ -101,9 +101,9 @@ class RandomBlockReplicationPolicy * @return list of m random unique indices */ private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = { - val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (s, i) => + val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) => val t = r.nextInt(i) + 1 - if (s.contains(t)) s + i else s + t + if (set.contains(t)) set + i else set + t } // we shuffle the result to ensure a random arrangement within the sample // to avoid any bias from set implementations diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index ff66e954f106..a0f0fdef8e94 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -47,26 +47,30 @@ abstract class TopologyMapper(conf: SparkConf) { def getTopologyForHost(hostname: String): Option[String] } +/** + * A TopologyMapper that assumes all nodes are in the same rack + */ @DeveloperApi class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { override def getTopologyForHost(hostname: String): Option[String] = { logDebug(s"Got a request for $hostname") - Some("DefaultRack") + None } } /** * A simple file based topology mapper. This expects topology information provided as a * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property - * `spark.replication.topologyawareness.topologyFile`. To use this topology mapper, set the - * `spark.replication.topologyawareness.topologyMapper` property to + * `spark.storage.replication.topologyFile`. To use this topology mapper, set the + * `spark.storage.replication.topologyMapper` property to * [[org.apache.spark.storage.FileBasedTopologyMapper]] * @param conf SparkConf object */ @DeveloperApi class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { - val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile") - require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.") + val topologyFile = conf.getOption("spark.storage.replication.topologyFile") + require(topologyFile.isDefined, "Please specify topology file via " + + "spark.storage.replication.topologyFile for FileBasedTopologyMapper.") val topologyMap = Utils.getPropertiesFromFile(topologyFile.get) override def getTopologyForHost(hostname: String): Option[String] = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 2f175c266733..0ad289b82359 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.postfixOps + import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ + import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.memory.UnifiedMemoryManager @@ -35,8 +37,6 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ -import scala.collection.mutable - /** Testsuite that tests block replication in BlockManager */ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers @@ -346,45 +346,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite } } - /** - * Test if we get the required number of peers when using random sampling from - * RandomBlockReplicationPolicy - */ - test(s"block replication - random block replication policy") { - val numBlockManagers = 10 - val storeSize = 1000 - val blockManagers = (1 to numBlockManagers).map { i => - BlockManagerId(s"store-$i", "localhost", 1000 + i, None) - } - val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None) - val replicationPolicy = new RandomBlockReplicationPolicy - val blockId = "test-block" - - (1 to 10).foreach {numReplicas => - logInfo(s"Num replicas : $numReplicas") - val randomPeers = replicationPolicy.prioritize( - candidateBlockManager, - blockManagers, - mutable.HashSet.empty[BlockManagerId], - blockId, - numReplicas - ) - logInfo(s"Random peers : ${randomPeers.mkString(", ")}") - assert(randomPeers.size === numReplicas) - - // choosing n peers out of n - val secondPass = replicationPolicy.prioritize( - candidateBlockManager, - randomPeers, - mutable.HashSet.empty[BlockManagerId], - blockId, - numReplicas - ) - logInfo(s"Random peers : ${secondPass.mkString(", ")}") - assert(secondPass.size === numReplicas) - } - - } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala new file mode 100644 index 000000000000..800c3899f1a7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.collection.mutable + +import org.scalatest.{BeforeAndAfter, Matchers} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} + +class BlockReplicationPolicySuite extends SparkFunSuite + with Matchers + with BeforeAndAfter + with LocalSparkContext { + + // Implicitly convert strings to BlockIds for test clarity. + private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + + /** + * Test if we get the required number of peers when using random sampling from + * RandomBlockReplicationPolicy + */ + test(s"block replication - random block replication policy") { + val numBlockManagers = 10 + val storeSize = 1000 + val blockManagers = (1 to numBlockManagers).map { i => + BlockManagerId(s"store-$i", "localhost", 1000 + i, None) + } + val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None) + val replicationPolicy = new RandomBlockReplicationPolicy + val blockId = "test-block" + + (1 to 10).foreach {numReplicas => + logDebug(s"Num replicas : $numReplicas") + val randomPeers = replicationPolicy.prioritize( + candidateBlockManager, + blockManagers, + mutable.HashSet.empty[BlockManagerId], + blockId, + numReplicas + ) + logDebug(s"Random peers : ${randomPeers.mkString(", ")}") + assert(randomPeers.toSet.size === numReplicas) + + // choosing n peers out of n + val secondPass = replicationPolicy.prioritize( + candidateBlockManager, + randomPeers, + mutable.HashSet.empty[BlockManagerId], + blockId, + numReplicas + ) + logDebug(s"Random peers : ${secondPass.mkString(", ")}") + assert(secondPass.toSet.size === numReplicas) + } + + } + +} diff --git a/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala b/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala new file mode 100644 index 000000000000..bbd252d7be7e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.{File, FileOutputStream} + +import org.scalatest.{BeforeAndAfter, Matchers} + +import org.apache.spark._ +import org.apache.spark.util.Utils + +class TopologyMapperSuite extends SparkFunSuite + with Matchers + with BeforeAndAfter + with LocalSparkContext { + + test("File based Topology Mapper") { + val numHosts = 100 + val numRacks = 4 + val props = (1 to numHosts).map{i => s"host-$i" -> s"rack-${i % numRacks}"}.toMap + val propsFile = createPropertiesFile(props) + + val sparkConf = (new SparkConf(false)) + sparkConf.set("spark.storage.replication.topologyFile", propsFile.getAbsolutePath) + val topologyMapper = new FileBasedTopologyMapper(sparkConf) + + props.foreach {case (host, topology) => + val obtainedTopology = topologyMapper.getTopologyForHost(host) + assert(obtainedTopology.isDefined) + assert(obtainedTopology.get === topology) + } + + // we get None for hosts not in the file + assert(topologyMapper.getTopologyForHost("host").isEmpty) + + cleanup(propsFile) + } + + def createPropertiesFile(props: Map[String, String]): File = { + val testFile = new File(Utils.createTempDir(), "TopologyMapperSuite-test").getAbsoluteFile + val fileOS = new FileOutputStream(testFile) + props.foreach{case (k, v) => fileOS.write(s"$k=$v\n".getBytes)} + fileOS.close + testFile + } + + def cleanup(testFile: File): Unit = { + testFile.getParentFile.listFiles.filter { file => + file.getName.startsWith(testFile.getName) + }.foreach { _.delete() } + } + +}