Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
582488d
Adding capability to prioritize peer executors based on rack awarenes…
shubhamchopra May 5, 2016
05e39c4
Minor modifications to get past the style check errors.
shubhamchopra May 6, 2016
b639a2d
Using blockId hashcode as a source of randomness, so we don't keep ch…
shubhamchopra May 6, 2016
c8f4c54
Several changes:
shubhamchopra May 9, 2016
105febe
Adding null check so a Block Manager can be initiaziled without the m…
shubhamchopra May 9, 2016
29c9096
Renaming classes/variables from rack to a more general topology.
shubhamchopra May 12, 2016
162bd55
Renaming classes/variables from rack to a more general topology.
shubhamchopra May 12, 2016
45a4ddf
We continue to randomly choose peers, so there is no change in curren…
shubhamchopra May 12, 2016
977e876
Spelling correction and minor changes in comments to use a more gener…
shubhamchopra May 13, 2016
de26707
Providing peersReplicateTo to the prioritizer.
shubhamchopra May 16, 2016
c0a75fc
Adding developer api annotations to TopologyMapper and BlockReplicati…
shubhamchopra May 17, 2016
e0729e6
Changes recommended by @HyukjinKwon to fix style issues.
shubhamchopra May 18, 2016
0117dff
Updating prioritizer api to use current blockmanager id for self iden…
shubhamchopra May 20, 2016
e32798c
Fixing style issues.
shubhamchopra Aug 5, 2016
463f754
Fixing style issues.
shubhamchopra Aug 5, 2016
6f0cffa
Adding a set-cover formulation for picking peers to replicate blocks.
shubhamchopra May 24, 2016
daa7cb4
Adding newline to the end of file
shubhamchopra May 24, 2016
fa55d5c
Modifying the optimizer to use a modified greedy optimizer. We now tr…
shubhamchopra Jun 14, 2016
4cd86db
Making sure we consider peers we have previously replicated to.
shubhamchopra Jun 16, 2016
69cfc45
1. Fixing topology mapper class issue, so we instantiate it correctly…
shubhamchopra Jun 24, 2016
265a24e
Adding an assertion in test case.
shubhamchopra Jul 12, 2016
30ba3e9
Searching for the set of peer that meet most new objectives is optimi…
shubhamchopra Jul 18, 2016
0abdb3d
Adding a basic HDFS like block replication strategy. Re-using BlockMa…
shubhamchopra Jul 19, 2016
0a892f6
Running the suite for PrioritizerStrategy as well.
shubhamchopra Jul 22, 2016
485c9a1
Renaming to follow naming convention.
shubhamchopra Jul 22, 2016
88d60e2
Rebasing to master. Adding two strategies with numReplicas constraint…
shubhamchopra Nov 1, 2016
f638f02
Fixing the test case to use the right conf parameter to set the repli…
shubhamchopra Nov 1, 2016
bdf69d6
Fixing style errors.
shubhamchopra Nov 2, 2016
d17f6e9
Incorporating suggestions from @sameeragarwal. Removing objectives ba…
shubhamchopra Feb 24, 2017
794a720
Incorporating suggestions from @cloud-fan
shubhamchopra Mar 23, 2017
c465aaf
Correcting indentation.
shubhamchopra Mar 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer


/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
Expand Down Expand Up @@ -1228,7 +1227,6 @@ private[spark] class BlockManager(
replication = 1)

val numPeersToReplicateTo = level.replication - 1

val startTime = System.nanoTime

var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
Expand Down Expand Up @@ -1283,7 +1281,6 @@ private[spark] class BlockManager(
numPeersToReplicateTo - peersReplicatedTo.size)
}
}

logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,46 @@ trait BlockReplicationPolicy {
numReplicas: Int): List[BlockManagerId]
}

object BlockReplicationUtils {
// scalastyle:off line.size.limit
/**
* Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
* minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
* here</a>.
*
* @param n total number of indices
* @param m number of samples needed
* @param r random number generator
* @return list of m random unique indices
*/
// scalastyle:on line.size.limit
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) {case (set, i) =>
val t = r.nextInt(i) + 1
if (set.contains(t)) set + i else set + t
}
indices.map(_ - 1).toList
}

/**
* Get a random sample of size m from the elems
*
* @param elems
* @param m number of samples needed
* @param r random number generator
* @tparam T
* @return a random list of size m. If there are fewer than m elements in elems, we just
* randomly shuffle elems
*/
def getRandomSample[T](elems: Seq[T], m: Int, r: Random): List[T] = {
if (elems.size > m) {
getSampleIds(elems.size, m, r).map(elems(_))
} else {
r.shuffle(elems).toList
Copy link
Member

@sameeragarwal sameeragarwal Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that you're already shuffling the sample here anyways, just out of curiosity is there any advantage of using Robert Floyd's algorithm over (say) Fisher-Yates? Also, more generally, is space complexity really a concern here? Can't we just use r.shuffle(totalSize).take(sampleSize) for easy readability?

EDIT: Please ignore my first concern. I misread the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree with you here. Except I was told earlier that iterating through a list the size of the executors was a concern. So this was to address time complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't the time complexity same for both cases? It seems like they both only differ in terms of space complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}

@DeveloperApi
class RandomBlockReplicationPolicy
extends BlockReplicationPolicy
Expand All @@ -67,6 +107,7 @@ class RandomBlockReplicationPolicy
* @param peersReplicatedTo Set of peers already replicated to
* @param blockId BlockId of the block being replicated. This can be used as a source of
* randomness if needed.
* @param numReplicas Number of peers we need to replicate to
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
override def prioritize(
Expand All @@ -78,7 +119,7 @@ class RandomBlockReplicationPolicy
val random = new Random(blockId.hashCode)
logDebug(s"Input peers : ${peers.mkString(", ")}")
val prioritizedPeers = if (peers.size > numReplicas) {
getSampleIds(peers.size, numReplicas, random).map(peers(_))
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
if (peers.size < numReplicas) {
logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
Expand All @@ -88,26 +129,96 @@ class RandomBlockReplicationPolicy
logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
prioritizedPeers
}
}

@DeveloperApi
class BasicBlockReplicationPolicy
extends BlockReplicationPolicy
with Logging {

// scalastyle:off line.size.limit
/**
* Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
* minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
* here</a>.
* Method to prioritize a bunch of candidate peers of a block manager. This implementation
* replicates the behavior of block replication in HDFS. For a given number of replicas needed,
* we choose a peer within the rack, one outside and remaining blockmanagers are chosen at
* random, in that order till we meet the number of replicas needed.
* This works best with a total replication factor of 3, like HDFS.
*
* @param n total number of indices
* @param m number of samples needed
* @param r random number generator
* @return list of m random unique indices
* @param blockManagerId Id of the current BlockManager for self identification
* @param peers A list of peers of a BlockManager
* @param peersReplicatedTo Set of peers already replicated to
* @param blockId BlockId of the block being replicated. This can be used as a source of
* randomness if needed.
* @param numReplicas Number of peers we need to replicate to
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
// scalastyle:on line.size.limit
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) =>
val t = r.nextInt(i) + 1
if (set.contains(t)) set + i else set + t
override def prioritize(
blockManagerId: BlockManagerId,
peers: Seq[BlockManagerId],
peersReplicatedTo: mutable.HashSet[BlockManagerId],
blockId: BlockId,
numReplicas: Int): List[BlockManagerId] = {

logDebug(s"Input peers : $peers")
logDebug(s"BlockManagerId : $blockManagerId")

val random = new Random(blockId.hashCode)

// if block doesn't have topology info, we can't do much, so we randomly shuffle
// if there is, we see what's needed from peersReplicatedTo and based on numReplicas,
// we choose whats needed
if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
// no topology info for the block. The best we can do is randomly choose peers
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
// we have topology information, we see what is left to be done from peersReplicatedTo
val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a .filter(_.host != blockManagerId.host)?

Copy link
Contributor Author

@shubhamchopra shubhamchopra Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Master ensures the list of peers sent to a block manager doesn't include the requesting block manager. Was that the intention here?

val doneOutsideRack = peersReplicatedTo.exists { p =>
p.topologyInfo.isDefined && p.topologyInfo != blockManagerId.topologyInfo
}

if (doneOutsideRack && doneWithinRack) {
// we are done, we just return a random sample
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
// we separate peers within and outside rack
val (inRackPeers, outOfRackPeers) = peers
.filter(_.host != blockManagerId.host)
.partition(_.topologyInfo == blockManagerId.topologyInfo)

val peerWithinRack = if (doneWithinRack) {
// we are done with in-rack replication, so don't need anymore peers
Seq.empty
} else {
if (inRackPeers.isEmpty) {
Seq.empty
} else {
Seq(inRackPeers(random.nextInt(inRackPeers.size)))
}
}

val peerOutsideRack = if (doneOutsideRack || numReplicas - peerWithinRack.size <= 0) {
Seq.empty
} else {
if (outOfRackPeers.isEmpty) {
Seq.empty
} else {
Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size)))
}
}

val priorityPeers = peerWithinRack ++ peerOutsideRack
val numRemainingPeers = numReplicas - priorityPeers.size
val remainingPeers = if (numRemainingPeers > 0) {
val rPeers = peers.filter(p => !priorityPeers.contains(p))
BlockReplicationUtils.getRandomSample(rPeers, numRemainingPeers, random)
} else {
Seq.empty
}

(priorityPeers ++ remainingPeers).toList
}

}
// we shuffle the result to ensure a random arrangement within the sample
// to avoid any bias from set implementations
r.shuffle(indices.map(_ - 1).toList)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
Expand All @@ -36,13 +37,15 @@ import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.util.Utils

trait BlockManagerReplicationBehavior extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

val conf: SparkConf

protected var rpcEnv: RpcEnv = null
protected var master: BlockManagerMaster = null
protected lazy val securityMgr = new SecurityManager(conf)
Expand All @@ -55,7 +58,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected val allStores = new ArrayBuffer[BlockManager]

// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test

protected lazy val serializer = new KryoSerializer(conf)

// Implicitly convert strings to BlockIds for test clarity.
Expand Down Expand Up @@ -471,7 +473,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
conf.set("spark.storage.replication.proactive", "true")
conf.set("spark.storage.exceptionOnPinLeak", "true")

(2 to 5).foreach{ i =>
(2 to 5).foreach { i =>
test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
testProactiveReplication(i)
}
Expand Down Expand Up @@ -524,3 +526,30 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
}
}
}

class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
// number of racks to test with
val numRacks = 3

/**
* Gets the topology information given the host name
*
* @param hostname Hostname
* @return random topology
*/
override def getTopologyForHost(hostname: String): Option[String] = {
Some(s"/Rack-${Utils.random.nextInt(numRacks)}")
}
}

class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior {
val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set(
"spark.storage.replication.policy",
classOf[BasicBlockReplicationPolicy].getName)
conf.set(
"spark.storage.replication.topologyMapper",
classOf[DummyTopologyMapper].getName)
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,34 @@
package org.apache.spark.storage

import scala.collection.mutable
import scala.util.Random

import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.{LocalSparkContext, SparkFunSuite}

class BlockReplicationPolicySuite extends SparkFunSuite
class RandomBlockReplicationPolicyBehavior extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

// Implicitly convert strings to BlockIds for test clarity.
private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)

val replicationPolicy: BlockReplicationPolicy = new RandomBlockReplicationPolicy

val blockId = "test-block"
/**
* Test if we get the required number of peers when using random sampling from
* RandomBlockReplicationPolicy
* BlockReplicationPolicy
*/
test(s"block replication - random block replication policy") {
test("block replication - random block replication policy") {
val numBlockManagers = 10
val storeSize = 1000
val blockManagers = (1 to numBlockManagers).map { i =>
BlockManagerId(s"store-$i", "localhost", 1000 + i, None)
}
val blockManagers = generateBlockManagerIds(numBlockManagers, Seq("/Rack-1"))
val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None)
val replicationPolicy = new RandomBlockReplicationPolicy
val blockId = "test-block"

(1 to 10).foreach {numReplicas =>
(1 to 10).foreach { numReplicas =>
logDebug(s"Num replicas : $numReplicas")
val randomPeers = replicationPolicy.prioritize(
candidateBlockManager,
Expand All @@ -68,7 +68,60 @@ class BlockReplicationPolicySuite extends SparkFunSuite
logDebug(s"Random peers : ${secondPass.mkString(", ")}")
assert(secondPass.toSet.size === numReplicas)
}
}

protected def generateBlockManagerIds(count: Int, racks: Seq[String]): Seq[BlockManagerId] = {
(1 to count).map{i =>
BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size))))
}
}
}

class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior {
override val replicationPolicy = new BasicBlockReplicationPolicy

test("All peers in the same rack") {
val racks = Seq("/default-rack")
val numBlockManager = 10
(1 to 10).foreach {numReplicas =>
val peers = generateBlockManagerIds(numBlockManager, racks)
val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head))

val prioritizedPeers = replicationPolicy.prioritize(
blockManager,
peers,
mutable.HashSet.empty,
blockId,
numReplicas
)

assert(prioritizedPeers.toSet.size == numReplicas)
assert(prioritizedPeers.forall(p => p.host != blockManager.host))
}
}

test("Peers in 2 racks") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by #17624

val racks = Seq("/Rack-1", "/Rack-2")
(1 to 10).foreach {numReplicas =>
val peers = generateBlockManagerIds(10, racks)
val blockManager = BlockManagerId("Driver", "Host-driver", 9001, Some(racks.head))

val prioritizedPeers = replicationPolicy.prioritize(
blockManager,
peers,
mutable.HashSet.empty,
blockId,
numReplicas
)

assert(prioritizedPeers.toSet.size == numReplicas)
val priorityPeers = prioritizedPeers.take(2)
assert(priorityPeers.forall(p => p.host != blockManager.host))
if(numReplicas > 1) {
// both these conditions should be satisfied when numReplicas > 1
assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo))
assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we test more explicitly that the first candidate is within rack and the second candidate is outside rack?

Copy link
Contributor Author

@shubhamchopra shubhamchopra Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intended behavior is to ensure one is within rack and one outside, not necessarily the first or the second.

}
}
}
}