Skip to content

Commit b454d44

Browse files
shubhamchopracloud-fan
authored andcommitted
[SPARK-15354][CORE] Topology aware block replication strategies
## What changes were proposed in this pull request? Implementations of strategies for resilient block replication for different resource managers that replicate the 3-replica strategy used by HDFS, where the first replica is on an executor, the second replica within the same rack as the executor and a third replica on a different rack. The implementation involves providing two pluggable classes, one running in the driver that provides topology information for every host at cluster start and the second prioritizing a list of peer BlockManagerIds. The prioritization itself can be thought of an optimization problem to find a minimal set of peers that satisfy certain objectives and replicating to these peers first. The objectives can be used to express richer constraints over and above HDFS like 3-replica strategy. ## How was this patch tested? This patch was tested with unit tests for storage, along with new unit tests to verify prioritization behaviour. Author: Shubham Chopra <[email protected]> Closes #13932 from shubhamchopra/PrioritizerStrategy.
1 parent edc87d7 commit b454d44

File tree

4 files changed

+222
-32
lines changed

4 files changed

+222
-32
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ import org.apache.spark.unsafe.Platform
4949
import org.apache.spark.util._
5050
import org.apache.spark.util.io.ChunkedByteBuffer
5151

52-
5352
/* Class for returning a fetched block and associated metrics. */
5453
private[spark] class BlockResult(
5554
val data: Iterator[Any],
@@ -1258,7 +1257,6 @@ private[spark] class BlockManager(
12581257
replication = 1)
12591258

12601259
val numPeersToReplicateTo = level.replication - 1
1261-
12621260
val startTime = System.nanoTime
12631261

12641262
var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
@@ -1313,7 +1311,6 @@ private[spark] class BlockManager(
13131311
numPeersToReplicateTo - peersReplicatedTo.size)
13141312
}
13151313
}
1316-
13171314
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
13181315
s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
13191316
if (peersReplicatedTo.size < numPeersToReplicateTo) {

core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala

Lines changed: 128 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,46 @@ trait BlockReplicationPolicy {
5353
numReplicas: Int): List[BlockManagerId]
5454
}
5555

56+
object BlockReplicationUtils {
57+
// scalastyle:off line.size.limit
58+
/**
59+
* Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
60+
* minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
61+
* here</a>.
62+
*
63+
* @param n total number of indices
64+
* @param m number of samples needed
65+
* @param r random number generator
66+
* @return list of m random unique indices
67+
*/
68+
// scalastyle:on line.size.limit
69+
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
70+
val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) {case (set, i) =>
71+
val t = r.nextInt(i) + 1
72+
if (set.contains(t)) set + i else set + t
73+
}
74+
indices.map(_ - 1).toList
75+
}
76+
77+
/**
78+
* Get a random sample of size m from the elems
79+
*
80+
* @param elems
81+
* @param m number of samples needed
82+
* @param r random number generator
83+
* @tparam T
84+
* @return a random list of size m. If there are fewer than m elements in elems, we just
85+
* randomly shuffle elems
86+
*/
87+
def getRandomSample[T](elems: Seq[T], m: Int, r: Random): List[T] = {
88+
if (elems.size > m) {
89+
getSampleIds(elems.size, m, r).map(elems(_))
90+
} else {
91+
r.shuffle(elems).toList
92+
}
93+
}
94+
}
95+
5696
@DeveloperApi
5797
class RandomBlockReplicationPolicy
5898
extends BlockReplicationPolicy
@@ -67,6 +107,7 @@ class RandomBlockReplicationPolicy
67107
* @param peersReplicatedTo Set of peers already replicated to
68108
* @param blockId BlockId of the block being replicated. This can be used as a source of
69109
* randomness if needed.
110+
* @param numReplicas Number of peers we need to replicate to
70111
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
71112
*/
72113
override def prioritize(
@@ -78,7 +119,7 @@ class RandomBlockReplicationPolicy
78119
val random = new Random(blockId.hashCode)
79120
logDebug(s"Input peers : ${peers.mkString(", ")}")
80121
val prioritizedPeers = if (peers.size > numReplicas) {
81-
getSampleIds(peers.size, numReplicas, random).map(peers(_))
122+
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
82123
} else {
83124
if (peers.size < numReplicas) {
84125
logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
@@ -88,26 +129,96 @@ class RandomBlockReplicationPolicy
88129
logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
89130
prioritizedPeers
90131
}
132+
}
133+
134+
@DeveloperApi
135+
class BasicBlockReplicationPolicy
136+
extends BlockReplicationPolicy
137+
with Logging {
91138

92-
// scalastyle:off line.size.limit
93139
/**
94-
* Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
95-
* minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
96-
* here</a>.
140+
* Method to prioritize a bunch of candidate peers of a block manager. This implementation
141+
* replicates the behavior of block replication in HDFS. For a given number of replicas needed,
142+
* we choose a peer within the rack, one outside and remaining blockmanagers are chosen at
143+
* random, in that order till we meet the number of replicas needed.
144+
* This works best with a total replication factor of 3, like HDFS.
97145
*
98-
* @param n total number of indices
99-
* @param m number of samples needed
100-
* @param r random number generator
101-
* @return list of m random unique indices
146+
* @param blockManagerId Id of the current BlockManager for self identification
147+
* @param peers A list of peers of a BlockManager
148+
* @param peersReplicatedTo Set of peers already replicated to
149+
* @param blockId BlockId of the block being replicated. This can be used as a source of
150+
* randomness if needed.
151+
* @param numReplicas Number of peers we need to replicate to
152+
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
102153
*/
103-
// scalastyle:on line.size.limit
104-
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
105-
val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) =>
106-
val t = r.nextInt(i) + 1
107-
if (set.contains(t)) set + i else set + t
154+
override def prioritize(
155+
blockManagerId: BlockManagerId,
156+
peers: Seq[BlockManagerId],
157+
peersReplicatedTo: mutable.HashSet[BlockManagerId],
158+
blockId: BlockId,
159+
numReplicas: Int): List[BlockManagerId] = {
160+
161+
logDebug(s"Input peers : $peers")
162+
logDebug(s"BlockManagerId : $blockManagerId")
163+
164+
val random = new Random(blockId.hashCode)
165+
166+
// if block doesn't have topology info, we can't do much, so we randomly shuffle
167+
// if there is, we see what's needed from peersReplicatedTo and based on numReplicas,
168+
// we choose whats needed
169+
if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
170+
// no topology info for the block. The best we can do is randomly choose peers
171+
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
172+
} else {
173+
// we have topology information, we see what is left to be done from peersReplicatedTo
174+
val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo)
175+
val doneOutsideRack = peersReplicatedTo.exists { p =>
176+
p.topologyInfo.isDefined && p.topologyInfo != blockManagerId.topologyInfo
177+
}
178+
179+
if (doneOutsideRack && doneWithinRack) {
180+
// we are done, we just return a random sample
181+
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
182+
} else {
183+
// we separate peers within and outside rack
184+
val (inRackPeers, outOfRackPeers) = peers
185+
.filter(_.host != blockManagerId.host)
186+
.partition(_.topologyInfo == blockManagerId.topologyInfo)
187+
188+
val peerWithinRack = if (doneWithinRack) {
189+
// we are done with in-rack replication, so don't need anymore peers
190+
Seq.empty
191+
} else {
192+
if (inRackPeers.isEmpty) {
193+
Seq.empty
194+
} else {
195+
Seq(inRackPeers(random.nextInt(inRackPeers.size)))
196+
}
197+
}
198+
199+
val peerOutsideRack = if (doneOutsideRack || numReplicas - peerWithinRack.size <= 0) {
200+
Seq.empty
201+
} else {
202+
if (outOfRackPeers.isEmpty) {
203+
Seq.empty
204+
} else {
205+
Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size)))
206+
}
207+
}
208+
209+
val priorityPeers = peerWithinRack ++ peerOutsideRack
210+
val numRemainingPeers = numReplicas - priorityPeers.size
211+
val remainingPeers = if (numRemainingPeers > 0) {
212+
val rPeers = peers.filter(p => !priorityPeers.contains(p))
213+
BlockReplicationUtils.getRandomSample(rPeers, numRemainingPeers, random)
214+
} else {
215+
Seq.empty
216+
}
217+
218+
(priorityPeers ++ remainingPeers).toList
219+
}
220+
108221
}
109-
// we shuffle the result to ensure a random arrangement within the sample
110-
// to avoid any bias from set implementations
111-
r.shuffle(indices.map(_ - 1).toList)
112222
}
223+
113224
}

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.scalatest.concurrent.Eventually._
2828

2929
import org.apache.spark._
3030
import org.apache.spark.broadcast.BroadcastManager
31+
import org.apache.spark.internal.Logging
3132
import org.apache.spark.memory.UnifiedMemoryManager
3233
import org.apache.spark.network.BlockTransferService
3334
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -36,13 +37,15 @@ import org.apache.spark.scheduler.LiveListenerBus
3637
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
3738
import org.apache.spark.shuffle.sort.SortShuffleManager
3839
import org.apache.spark.storage.StorageLevel._
40+
import org.apache.spark.util.Utils
3941

4042
trait BlockManagerReplicationBehavior extends SparkFunSuite
4143
with Matchers
4244
with BeforeAndAfter
4345
with LocalSparkContext {
4446

4547
val conf: SparkConf
48+
4649
protected var rpcEnv: RpcEnv = null
4750
protected var master: BlockManagerMaster = null
4851
protected lazy val securityMgr = new SecurityManager(conf)
@@ -55,7 +58,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
5558
protected val allStores = new ArrayBuffer[BlockManager]
5659

5760
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
58-
5961
protected lazy val serializer = new KryoSerializer(conf)
6062

6163
// Implicitly convert strings to BlockIds for test clarity.
@@ -471,7 +473,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
471473
conf.set("spark.storage.replication.proactive", "true")
472474
conf.set("spark.storage.exceptionOnPinLeak", "true")
473475

474-
(2 to 5).foreach{ i =>
476+
(2 to 5).foreach { i =>
475477
test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
476478
testProactiveReplication(i)
477479
}
@@ -524,3 +526,30 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
524526
}
525527
}
526528
}
529+
530+
class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
531+
// number of racks to test with
532+
val numRacks = 3
533+
534+
/**
535+
* Gets the topology information given the host name
536+
*
537+
* @param hostname Hostname
538+
* @return random topology
539+
*/
540+
override def getTopologyForHost(hostname: String): Option[String] = {
541+
Some(s"/Rack-${Utils.random.nextInt(numRacks)}")
542+
}
543+
}
544+
545+
class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior {
546+
val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
547+
conf.set("spark.kryoserializer.buffer", "1m")
548+
conf.set(
549+
"spark.storage.replication.policy",
550+
classOf[BasicBlockReplicationPolicy].getName)
551+
conf.set(
552+
"spark.storage.replication.topologyMapper",
553+
classOf[DummyTopologyMapper].getName)
554+
}
555+

core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,34 @@
1818
package org.apache.spark.storage
1919

2020
import scala.collection.mutable
21+
import scala.util.Random
2122

2223
import org.scalatest.{BeforeAndAfter, Matchers}
2324

2425
import org.apache.spark.{LocalSparkContext, SparkFunSuite}
2526

26-
class BlockReplicationPolicySuite extends SparkFunSuite
27+
class RandomBlockReplicationPolicyBehavior extends SparkFunSuite
2728
with Matchers
2829
with BeforeAndAfter
2930
with LocalSparkContext {
3031

3132
// Implicitly convert strings to BlockIds for test clarity.
32-
private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
33+
protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
3334

35+
val replicationPolicy: BlockReplicationPolicy = new RandomBlockReplicationPolicy
36+
37+
val blockId = "test-block"
3438
/**
3539
* Test if we get the required number of peers when using random sampling from
36-
* RandomBlockReplicationPolicy
40+
* BlockReplicationPolicy
3741
*/
38-
test(s"block replication - random block replication policy") {
42+
test("block replication - random block replication policy") {
3943
val numBlockManagers = 10
4044
val storeSize = 1000
41-
val blockManagers = (1 to numBlockManagers).map { i =>
42-
BlockManagerId(s"store-$i", "localhost", 1000 + i, None)
43-
}
45+
val blockManagers = generateBlockManagerIds(numBlockManagers, Seq("/Rack-1"))
4446
val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None)
45-
val replicationPolicy = new RandomBlockReplicationPolicy
46-
val blockId = "test-block"
4747

48-
(1 to 10).foreach {numReplicas =>
48+
(1 to 10).foreach { numReplicas =>
4949
logDebug(s"Num replicas : $numReplicas")
5050
val randomPeers = replicationPolicy.prioritize(
5151
candidateBlockManager,
@@ -68,7 +68,60 @@ class BlockReplicationPolicySuite extends SparkFunSuite
6868
logDebug(s"Random peers : ${secondPass.mkString(", ")}")
6969
assert(secondPass.toSet.size === numReplicas)
7070
}
71+
}
72+
73+
protected def generateBlockManagerIds(count: Int, racks: Seq[String]): Seq[BlockManagerId] = {
74+
(1 to count).map{i =>
75+
BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size))))
76+
}
77+
}
78+
}
79+
80+
class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior {
81+
override val replicationPolicy = new BasicBlockReplicationPolicy
82+
83+
test("All peers in the same rack") {
84+
val racks = Seq("/default-rack")
85+
val numBlockManager = 10
86+
(1 to 10).foreach {numReplicas =>
87+
val peers = generateBlockManagerIds(numBlockManager, racks)
88+
val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head))
89+
90+
val prioritizedPeers = replicationPolicy.prioritize(
91+
blockManager,
92+
peers,
93+
mutable.HashSet.empty,
94+
blockId,
95+
numReplicas
96+
)
7197

98+
assert(prioritizedPeers.toSet.size == numReplicas)
99+
assert(prioritizedPeers.forall(p => p.host != blockManager.host))
100+
}
72101
}
73102

103+
test("Peers in 2 racks") {
104+
val racks = Seq("/Rack-1", "/Rack-2")
105+
(1 to 10).foreach {numReplicas =>
106+
val peers = generateBlockManagerIds(10, racks)
107+
val blockManager = BlockManagerId("Driver", "Host-driver", 9001, Some(racks.head))
108+
109+
val prioritizedPeers = replicationPolicy.prioritize(
110+
blockManager,
111+
peers,
112+
mutable.HashSet.empty,
113+
blockId,
114+
numReplicas
115+
)
116+
117+
assert(prioritizedPeers.toSet.size == numReplicas)
118+
val priorityPeers = prioritizedPeers.take(2)
119+
assert(priorityPeers.forall(p => p.host != blockManager.host))
120+
if(numReplicas > 1) {
121+
// both these conditions should be satisfied when numReplicas > 1
122+
assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo))
123+
assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo))
124+
}
125+
}
126+
}
74127
}

0 commit comments

Comments
 (0)