From 582488de002ae08e32c391abb63f7a6eb63419dd Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 5 May 2016 18:06:14 -0400 Subject: [PATCH 01/31] Adding capability to prioritize peer executors based on rack awareness while replicating blocks. --- .../apache/spark/storage/BlockManager.scala | 1 - .../spark/storage/BlockManagerMaster.scala | 4 ++ .../storage/BlockManagerMasterEndpoint.scala | 7 +++ .../spark/storage/BlockManagerMessages.scala | 2 + .../storage/RackAwarePrioritization.scala | 49 +++++++++++++++++++ 5 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 991346a40af4..1ff8c986beee 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -48,7 +48,6 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer - /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( val data: Iterator[Any], diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3ca690db9e79..59d8743e9f71 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -235,6 +235,10 @@ class BlockManagerMaster( } } + def getRackInfo(host: String): String = { + driverEndpoint.askWithRetry[String](GetRackInfo(host)) + } + /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ private def tell(message: Any) { if (!driverEndpoint.askSync[Boolean](message)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 84c04d22600a..39a84447238d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -138,6 +138,13 @@ class BlockManagerMasterEndpoint( } case None => context.reply(false) } + + case GetRackInfo(host) => context.reply(getRackInfoForHost(host)) + + } + + private def getRackInfoForHost(host: String): String = { + s"$host-1" } private def removeRdd(rddId: Int): Future[Seq[Int]] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 0aea438e7f47..e4c80eaa3a0e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -115,4 +115,6 @@ private[spark] object BlockManagerMessages { case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster + + case class GetRackInfo(host: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala new file mode 100644 index 000000000000..ce0d48ec170d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.internal.Logging + +/* Trait that should be implemented by any class implementing rack aware prioritization */ +trait RackAwarePriotization { + /** + * Method to prioritize a bunch of candidate peers of a block + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] +} + +class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { + /** + * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, + * that just makes sure we put blocks on different hosts, if possible + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + override def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] = { + logInfo(s"Input peers : ${peers.mkString(", ")}") + val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) + val peersOnHost = peers.filter(p => p.host.equals(host)) + val ret = peersOnOtherHosts ++ peersOnHost + logInfo(s"Prioritized peers : ${ret.mkString(", ")}") + ret + } +} \ No newline at end of file From 05e39c4e010a8683c89e9e44562eac77ed2e1d9a Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 6 May 2016 13:40:47 -0400 Subject: [PATCH 02/31] Minor modifications to get past the style check errors. --- .../storage/RackAwarePrioritization.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala index ce0d48ec170d..cb790b5229cc 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -21,23 +21,25 @@ import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ trait RackAwarePriotization { + /** - * Method to prioritize a bunch of candidate peers of a block - * - * @param peers A list of peers of a BlockManager - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ + * Method to prioritize a bunch of candidate peers of a block + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] } class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { + /** - * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, - * that just makes sure we put blocks on different hosts, if possible - * - * @param peers A list of peers of a BlockManager - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ + * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, + * that just makes sure we put blocks on different hosts, if possible + * + * @param peers A list of peers of a BlockManager + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ override def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] = { logInfo(s"Input peers : ${peers.mkString(", ")}") val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) @@ -46,4 +48,4 @@ class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization logInfo(s"Prioritized peers : ${ret.mkString(", ")}") ret } -} \ No newline at end of file +} From b639a2daaf4afbfbf3c08575eaa06f5a09317cfb Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 6 May 2016 16:31:22 -0400 Subject: [PATCH 03/31] Using blockId hashcode as a source of randomness, so we don't keep choosing the same peers for replication. --- .../spark/storage/RackAwarePrioritization.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala index cb790b5229cc..7dcf48421fb8 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -17,6 +17,8 @@ package org.apache.spark.storage +import scala.util.Random + import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ @@ -24,11 +26,12 @@ trait RackAwarePriotization { /** * Method to prioritize a bunch of candidate peers of a block - * * @param peers A list of peers of a BlockManager + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] + def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] } class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { @@ -38,12 +41,16 @@ class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization * that just makes sure we put blocks on different hosts, if possible * * @param peers A list of peers of a BlockManager + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(peers: Seq[BlockManagerId]): Seq[BlockManagerId] = { + override def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { + val random = new Random(blockId.hashCode) + logInfo(s"Input peers : ${peers.mkString(", ")}") - val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) - val peersOnHost = peers.filter(p => p.host.equals(host)) + val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)).sortBy(_ => random.nextInt) + val peersOnHost = peers.filter(p => p.host.equals(host)).sortBy(_ => random.nextInt) val ret = peersOnOtherHosts ++ peersOnHost logInfo(s"Prioritized peers : ${ret.mkString(", ")}") ret From c8f4c54ac9e916a8aeffc8e0c2a9f44724460815 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 9 May 2016 16:36:53 -0400 Subject: [PATCH 04/31] Several changes: 1. Adding rack attribute to hashcode and equals to block manager id. 2. Removing boolean check for rack awareness. Asking master for rack info, and master uses topology mapper. 3. Adding a topology mapper trait and a default implementation that block manager master endpoint uses to discern topology information. --- .../spark/storage/BlockManagerMasterEndpoint.scala | 7 ------- .../apache/spark/storage/RackAwarePrioritization.scala | 10 +++++----- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 39a84447238d..84c04d22600a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -138,13 +138,6 @@ class BlockManagerMasterEndpoint( } case None => context.reply(false) } - - case GetRackInfo(host) => context.reply(getRackInfoForHost(host)) - - } - - private def getRackInfoForHost(host: String): String = { - s"$host-1" } private def removeRdd(rddId: Int): Future[Seq[Int]] = { diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala index 7dcf48421fb8..99c351dd863e 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala @@ -48,11 +48,11 @@ class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization override def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) - logInfo(s"Input peers : ${peers.mkString(", ")}") - val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)).sortBy(_ => random.nextInt) - val peersOnHost = peers.filter(p => p.host.equals(host)).sortBy(_ => random.nextInt) - val ret = peersOnOtherHosts ++ peersOnHost - logInfo(s"Prioritized peers : ${ret.mkString(", ")}") + logDebug(s"Input peers : ${peers.mkString(", ")}") + val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) + val peersOnHost = peers.filter(p => p.host.equals(host)) + val ret = random.shuffle(peersOnOtherHosts) ++ random.shuffle(peersOnHost) + logDebug(s"Prioritized peers : ${ret.mkString(", ")}") ret } } From 105febeacc1733a42c30a976ebb46e8e19682ad6 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 9 May 2016 16:58:21 -0400 Subject: [PATCH 05/31] Adding null check so a Block Manager can be initiaziled without the master. --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1ff8c986beee..fd53e1124ce0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -164,7 +164,7 @@ private[spark] class BlockManager( def initialize(appId: String): Unit = { blockTransferService.init(this) shuffleClient.init(appId) - + blockReplicationPolicy = { val priorityClass = conf.get( "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) From 29c909699d4f63147d2c26c6d65b595e42ab7d7f Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 12 May 2016 17:09:16 -0400 Subject: [PATCH 06/31] Renaming classes/variables from rack to a more general topology. --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/storage/BlockManagerMaster.scala | 2 +- .../org/apache/spark/storage/BlockManagerMessages.scala | 2 +- ...tization.scala => BlockReplicationPrioritization.scala} | 7 +++++-- 4 files changed, 8 insertions(+), 5 deletions(-) rename core/src/main/scala/org/apache/spark/storage/{RackAwarePrioritization.scala => BlockReplicationPrioritization.scala} (93%) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fd53e1124ce0..1ff8c986beee 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -164,7 +164,7 @@ private[spark] class BlockManager( def initialize(appId: String): Unit = { blockTransferService.init(this) shuffleClient.init(appId) - + blockReplicationPolicy = { val priorityClass = conf.get( "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 59d8743e9f71..a184fb752d64 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -236,7 +236,7 @@ class BlockManagerMaster( } def getRackInfo(host: String): String = { - driverEndpoint.askWithRetry[String](GetRackInfo(host)) + driverEndpoint.askWithRetry[String](GetTopologyInfo(host)) } /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index e4c80eaa3a0e..73a7d3e4d7b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -116,5 +116,5 @@ private[spark] object BlockManagerMessages { case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster - case class GetRackInfo(host: String) extends ToBlockManagerMaster + case class GetTopologyInfo(host: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala similarity index 93% rename from core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala rename to core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 99c351dd863e..8680568f6976 100644 --- a/core/src/main/scala/org/apache/spark/storage/RackAwarePrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -22,10 +22,11 @@ import scala.util.Random import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ -trait RackAwarePriotization { +trait BlockReplicationPriotization { /** * Method to prioritize a bunch of candidate peers of a block + * * @param peers A list of peers of a BlockManager * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. @@ -34,7 +35,9 @@ trait RackAwarePriotization { def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] } -class DefaultRackAwarePrioritization(host: String) extends RackAwarePriotization with Logging { +class DefaultBlockReplicationPrioritization(host: String) + extends BlockReplicationPriotization + with Logging { /** * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, From 162bd55f1dfbbdc1f79cfdb388a3708c863214eb Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 12 May 2016 17:15:46 -0400 Subject: [PATCH 07/31] Renaming classes/variables from rack to a more general topology. --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 +-- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1ff8c986beee..5f9ea1055375 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1227,7 +1227,6 @@ private[spark] class BlockManager( replication = 1) val numPeersToReplicateTo = level.replication - 1 - val startTime = System.nanoTime var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas @@ -1282,7 +1281,7 @@ private[spark] class BlockManager( numPeersToReplicateTo - peersReplicatedTo.size) } } - + logDebug(s"Replicating $blockId of ${data.size} bytes to " + s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index a184fb752d64..06904cabbef9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -235,7 +235,7 @@ class BlockManagerMaster( } } - def getRackInfo(host: String): String = { + def getTopologyInfo(host: String): String = { driverEndpoint.askWithRetry[String](GetTopologyInfo(host)) } From 45a4ddf42870547726add2f83fd48658e48f3a19 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 12 May 2016 17:27:29 -0400 Subject: [PATCH 08/31] We continue to randomly choose peers, so there is no change in current behavior. --- .../apache/spark/storage/BlockReplicationPrioritization.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 8680568f6976..c2d9d37ace22 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -52,9 +52,7 @@ class DefaultBlockReplicationPrioritization(host: String) val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") - val peersOnOtherHosts = peers.filter(p => !p.host.equals(host)) - val peersOnHost = peers.filter(p => p.host.equals(host)) - val ret = random.shuffle(peersOnOtherHosts) ++ random.shuffle(peersOnHost) + val ret = random.shuffle(peers) logDebug(s"Prioritized peers : ${ret.mkString(", ")}") ret } From 977e876675dff34aa8618861b15a9cb7072ee1d0 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 13 May 2016 11:36:17 -0400 Subject: [PATCH 09/31] Spelling correction and minor changes in comments to use a more general topology instead of rack. --- .../apache/spark/storage/BlockReplicationPrioritization.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index c2d9d37ace22..1464658fcebb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.internal.Logging /* Trait that should be implemented by any class implementing rack aware prioritization */ -trait BlockReplicationPriotization { +trait BlockReplicationPrioritization { /** * Method to prioritize a bunch of candidate peers of a block @@ -36,7 +36,7 @@ trait BlockReplicationPriotization { } class DefaultBlockReplicationPrioritization(host: String) - extends BlockReplicationPriotization + extends BlockReplicationPrioritization with Logging { /** From de26707c01d267920e4f77f2b8b21964a462f2f0 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 16 May 2016 17:47:33 -0400 Subject: [PATCH 10/31] Providing peersReplicateTo to the prioritizer. --- .../storage/BlockReplicationPrioritization.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 1464658fcebb..c348e039b98f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -26,13 +26,16 @@ trait BlockReplicationPrioritization { /** * Method to prioritize a bunch of candidate peers of a block - * + * * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] + def prioritize(peers: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] } class DefaultBlockReplicationPrioritization(host: String) @@ -44,11 +47,14 @@ class DefaultBlockReplicationPrioritization(host: String) * that just makes sure we put blocks on different hosts, if possible * * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(peers: Seq[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { + override def prioritize(peers: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") From c0a75fc31604969204618240cf23238b6d71113b Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 17 May 2016 15:25:34 -0400 Subject: [PATCH 11/31] Adding developer api annotations to TopologyMapper and BlockReplicationPrioritization --- .../storage/BlockReplicationPrioritization.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index c348e039b98f..f0636610db45 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -17,11 +17,17 @@ package org.apache.spark.storage -import scala.util.Random +import org.apache.spark.annotation.DeveloperApi +import scala.util.Random import org.apache.spark.internal.Logging -/* Trait that should be implemented by any class implementing rack aware prioritization */ +/** + * ::DeveloperApi:: + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for + * replicating blocks + */ +@DeveloperApi trait BlockReplicationPrioritization { /** @@ -38,6 +44,7 @@ trait BlockReplicationPrioritization { blockId: BlockId): Seq[BlockManagerId] } +@DeveloperApi class DefaultBlockReplicationPrioritization(host: String) extends BlockReplicationPrioritization with Logging { From e0729e6372bf2ebb60aa9bd0ce56b6f696fd57d1 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Wed, 18 May 2016 16:52:22 -0400 Subject: [PATCH 12/31] Changes recommended by @HyukjinKwon to fix style issues. --- .../storage/BlockReplicationPrioritization.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index f0636610db45..8c7903a0579d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -17,9 +17,9 @@ package org.apache.spark.storage -import org.apache.spark.annotation.DeveloperApi - import scala.util.Random + +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging /** @@ -40,8 +40,8 @@ trait BlockReplicationPrioritization { * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ def prioritize(peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] } @DeveloperApi @@ -60,8 +60,8 @@ class DefaultBlockReplicationPrioritization(host: String) * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ override def prioritize(peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] = { + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") From 0117dffc078fcba1acc0396b3f862c379b40aaeb Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 20 May 2016 14:43:07 -0400 Subject: [PATCH 13/31] Updating prioritizer api to use current blockmanager id for self identification. --- .../spark/storage/BlockReplicationPrioritization.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 8c7903a0579d..457f7813822f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -33,19 +33,21 @@ trait BlockReplicationPrioritization { /** * Method to prioritize a bunch of candidate peers of a block * + * @param blockManagerId Id of the current BlockManager for self identification * @param peers A list of peers of a BlockManager * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(peers: Seq[BlockManagerId], + def prioritize(blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] } @DeveloperApi -class DefaultBlockReplicationPrioritization(host: String) +class DefaultBlockReplicationPrioritization extends BlockReplicationPrioritization with Logging { @@ -53,13 +55,15 @@ class DefaultBlockReplicationPrioritization(host: String) * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, * that just makes sure we put blocks on different hosts, if possible * + * @param blockManagerId Id of the current BlockManager for self identification * @param peers A list of peers of a BlockManager * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(peers: Seq[BlockManagerId], + override def prioritize(blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { val random = new Random(blockId.hashCode) From e32798c3ef3eaedbf88869d2f450258a37a613bc Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 5 Aug 2016 14:34:37 -0400 Subject: [PATCH 14/31] Fixing style issues. --- .../org/apache/spark/storage/BlockManagerMessages.scala | 2 -- .../spark/storage/BlockReplicationPrioritization.scala | 6 ++++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 73a7d3e4d7b3..0aea438e7f47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -115,6 +115,4 @@ private[spark] object BlockManagerMessages { case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster - - case class GetTopologyInfo(host: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 457f7813822f..77911620ad6e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -40,7 +40,8 @@ trait BlockReplicationPrioritization { * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - def prioritize(blockManagerId: BlockManagerId, + def prioritize( + blockManagerId: BlockManagerId, peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] @@ -62,7 +63,8 @@ class DefaultBlockReplicationPrioritization * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(blockManagerId: BlockManagerId, + override def prioritize( + blockManagerId: BlockManagerId, peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { From 463f7548d702435aebd0ddd766c0540aefad1f65 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 5 Aug 2016 14:38:34 -0400 Subject: [PATCH 15/31] Fixing style issues. --- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 06904cabbef9..3ca690db9e79 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -235,10 +235,6 @@ class BlockManagerMaster( } } - def getTopologyInfo(host: String): String = { - driverEndpoint.askWithRetry[String](GetTopologyInfo(host)) - } - /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ private def tell(message: Any) { if (!driverEndpoint.askSync[Boolean](message)) { From 6f0cffa95a8c1fb7566f6ca2d186a44fcffd854e Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 24 May 2016 14:09:08 -0400 Subject: [PATCH 16/31] Adding a set-cover formulation for picking peers to replicate blocks. --- .../storage/BlockReplicationObjectives.scala | 154 ++++++++++++++++++ .../storage/BlockReplicationObjectives.scala | 73 +++++++++ 2 files changed, 227 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala create mode 100644 core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala new file mode 100644 index 000000000000..8c3759e29b74 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.annotation.tailrec +import scala.util.Random + +trait BlockReplicationObjective { + def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean + def getPeersToMeetObjective(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Set[BlockManagerId] +} + +object ReplicateToADifferentHost + extends BlockReplicationObjective { + override def isObjectiveMet(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Boolean = { + peers.exists(_.host != blockManagerId.host) + } + + override def getPeersToMeetObjective(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Set[BlockManagerId] = { + peers.filter(_.host != blockManagerId.host) + } + + override def toString: String = "ReplicateToADifferentHost" +} + +object ReplicateBlockWithinRack + extends BlockReplicationObjective { + + override def isObjectiveMet(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Boolean = { + peers.exists(_.topologyInfo == blockManagerId.topologyInfo) + } + + override def getPeersToMeetObjective(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Set[BlockManagerId] = { + peers.filter(_.topologyInfo == blockManagerId.topologyInfo) + } + + override def toString: String = "ReplicateBlockWithinRack" +} + +object ReplicateBlockOutsideRack + extends BlockReplicationObjective { + override def isObjectiveMet(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Boolean = { + peers.exists(_.topologyInfo != blockManagerId.topologyInfo) + } + + override def getPeersToMeetObjective(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Set[BlockManagerId] = { + peers.filter(_.topologyInfo != blockManagerId.topologyInfo) + } + + override def toString: String = "ReplicateBlockOutsideRack" +} + +object RandomlyReplicateBlock + extends BlockReplicationObjective { + override def isObjectiveMet(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Boolean = { + peers.nonEmpty + } + + override def getPeersToMeetObjective(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Set[BlockManagerId] = { + peers + } + + override def toString: String = "RandomlyReplicateBlock" +} + +object BlockReplicationOptimizer { + def getPeersToMeetObjectives(objectives: Set[BlockReplicationObjective], + peers: Set[BlockManagerId], + blockId: BlockId, + blockManagerId: BlockManagerId): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { + val peersMap = + peers.map(p => p -> objectives.filter(_.isObjectiveMet(blockManagerId, Set(p)))).toMap + val random = new Random(blockId.hashCode) + getOptimalPeers(peersMap, Set.empty, Set.empty, random) + } + + /** + * Greedy solution for set-cover formulation of the problem to get minimal number of peers + * that meet all the objectives. Each peer satisfies a subset of objectives, and we need to + * find a set of peers that together satisfy all or most objectives. This is done by + * 1. greedily picking a peer that satisfies most objectives at a given stage. To make results + * randomized, we break ties by choosing one or the other randomly. + * 2. Once chosen, the set of objectives satisfied by the peer are removed from + * all the remaining peers. + * 3. Go back to step 1 until we exhaust all peers, or existing peers don't satisfy any more + * objectives. + * + * @param peersMap + * @param optimalPeers + * @return + */ + @tailrec + private def getOptimalPeers(peersMap: Map[BlockManagerId, Set[BlockReplicationObjective]], + optimalPeers: Set[BlockManagerId], + objectivesMetSoFar: Set[BlockReplicationObjective], + random: Random): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { + + if (peersMap.isEmpty) { + // we are done here, as no more peers left + (optimalPeers, objectivesMetSoFar) + } else { + val (maxPeer, objectivesMet) = + peersMap.reduce[(BlockManagerId, Set[BlockReplicationObjective])] + {case ((pmax, omax), (p, os)) => + if (omax.size > os.size) { + (pmax, omax) + } else if (omax.size == os.size) { + // size are the same, we randomly choose between the two with equal probability + if (random.nextDouble > 0.5) (pmax, omax) else (p, os) + } else { + (p, os) + } + } + if(objectivesMet.isEmpty) { + // we are done here since either no more objectives left, or + // no more peers left that satisfy any objectives + (optimalPeers, objectivesMetSoFar) + } else { + val updatedPeersMap = (peersMap - maxPeer).map {case (peer, objectives) => + peer -> (objectives diff objectivesMet) + } + getOptimalPeers( + updatedPeersMap, + optimalPeers + maxPeer, + objectivesMetSoFar ++ objectivesMet, + random) + } + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala new file mode 100644 index 000000000000..43b121c34812 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.util.Random + +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite + +class BlockReplicationObjectives extends SparkFunSuite with Matchers { + val objectives = Set( + ReplicateToADifferentHost, + ReplicateBlockOutsideRack, + ReplicateBlockWithinRack + ) + + test("peers are all in the same rack") { + val blockManagerIds = generateBlockManagerIds(10, List("Default-rack")) + + val blockId = BlockId("test_block") + + val candidateBMId = generateBlockManagerIds(1, List("Default-rack")).head + + val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( + objectives, + blockManagerIds, + blockId, + candidateBMId) + + logInfo(s"Optimal peers : ${optimalPeers}") + logInfo(s"Objectives met : ${objectivesMet}") + assert(optimalPeers.size == 1) + } + + test("peers in 3 racks") { + val blockManagerIds = generateBlockManagerIds(10, List("/Rack1", "/Rack2", "/Rack3")) + val candidateBMId = generateBlockManagerIds(1, List("/Rack1", "/Rack2", "/Rack3")).head + val blockId = BlockId("test_block") + val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( + objectives, + blockManagerIds, + blockId, + candidateBMId) + + logInfo(s"Optimal peers : ${optimalPeers}") + logInfo(s"Objectives met : ${objectivesMet}") + assert(optimalPeers.size == 2) + assert(objectivesMet.size == 3) + assert(objectives.forall(_.isObjectiveMet(candidateBMId, optimalPeers))) + } + + private def generateBlockManagerIds(count: Int, racks: Seq[String]): Set[BlockManagerId] = { + (1 to count).map{i => + BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size)))) + }.toSet + } +} From daa7cb44e3f422456abb6e9ca7f134300da0f7a7 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 24 May 2016 14:11:45 -0400 Subject: [PATCH 17/31] Adding newline to the end of file --- .../org/apache/spark/storage/BlockReplicationObjectives.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 8c3759e29b74..bc054349afc8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -151,4 +151,4 @@ object BlockReplicationOptimizer { } } } -} \ No newline at end of file +} From fa55d5cbed3840b5216854b618335b064393b409 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 14 Jun 2016 18:31:52 -0400 Subject: [PATCH 18/31] Modifying the optimizer to use a modified greedy optimizer. We now try to get peers making sure objectives previously satisfied are not violated. --- .../storage/BlockReplicationObjectives.scala | 150 +++++++++--------- .../BlockReplicationPrioritization.scala | 39 +++++ .../storage/BlockReplicationObjectives.scala | 12 +- 3 files changed, 123 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index bc054349afc8..1b7d59b9b2ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -17,77 +17,60 @@ package org.apache.spark.storage +import org.apache.spark.internal.Logging + import scala.annotation.tailrec import scala.util.Random trait BlockReplicationObjective { + val weight = 1 def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean - def getPeersToMeetObjective(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Set[BlockManagerId] + } -object ReplicateToADifferentHost +case object ReplicateToADifferentHost extends BlockReplicationObjective { override def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean = { peers.exists(_.host != blockManagerId.host) } - - override def getPeersToMeetObjective(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Set[BlockManagerId] = { - peers.filter(_.host != blockManagerId.host) - } - - override def toString: String = "ReplicateToADifferentHost" } -object ReplicateBlockWithinRack +case object ReplicateBlockWithinRack extends BlockReplicationObjective { override def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean = { peers.exists(_.topologyInfo == blockManagerId.topologyInfo) } - - override def getPeersToMeetObjective(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Set[BlockManagerId] = { - peers.filter(_.topologyInfo == blockManagerId.topologyInfo) - } - - override def toString: String = "ReplicateBlockWithinRack" } -object ReplicateBlockOutsideRack +case object ReplicateBlockOutsideRack extends BlockReplicationObjective { override def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean = { peers.exists(_.topologyInfo != blockManagerId.topologyInfo) } - - override def getPeersToMeetObjective(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Set[BlockManagerId] = { - peers.filter(_.topologyInfo != blockManagerId.topologyInfo) - } - - override def toString: String = "ReplicateBlockOutsideRack" } -object RandomlyReplicateBlock +case object RandomlyReplicateBlock extends BlockReplicationObjective { override def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean = { peers.nonEmpty } +} - override def getPeersToMeetObjective(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Set[BlockManagerId] = { - peers +case object NoTwoReplicasInSameRack + extends BlockReplicationObjective { + override def isObjectiveMet(blockManagerId: BlockManagerId, + peers: Set[BlockManagerId]): Boolean = { + val racksReplicatedTo = peers.map(_.topologyInfo).toSet.size + (peers.size == racksReplicatedTo) } - - override def toString: String = "RandomlyReplicateBlock" } -object BlockReplicationOptimizer { +object BlockReplicationOptimizer extends Logging { def getPeersToMeetObjectives(objectives: Set[BlockReplicationObjective], peers: Set[BlockManagerId], blockId: BlockId, @@ -95,60 +78,81 @@ object BlockReplicationOptimizer { val peersMap = peers.map(p => p -> objectives.filter(_.isObjectiveMet(blockManagerId, Set(p)))).toMap val random = new Random(blockId.hashCode) - getOptimalPeers(peersMap, Set.empty, Set.empty, random) + getOptimalPeers(peers, objectives, Set.empty, Set.empty, random, blockManagerId) } /** - * Greedy solution for set-cover formulation of the problem to get minimal number of peers - * that meet all the objectives. Each peer satisfies a subset of objectives, and we need to - * find a set of peers that together satisfy all or most objectives. This is done by - * 1. greedily picking a peer that satisfies most objectives at a given stage. To make results - * randomized, we break ties by choosing one or the other randomly. - * 2. Once chosen, the set of objectives satisfied by the peer are removed from - * all the remaining peers. - * 3. Go back to step 1 until we exhaust all peers, or existing peers don't satisfy any more - * objectives. - * - * @param peersMap + * Greedy solution for set-cover like formulation. + * 1. We see how many objectives each peer satisfies + * 2. We choose a peer whose addition to optimal peers set satisfies the most objectives + * while making sure any previously satisfied objectives are still satisfied. + * 3. Once chosen, we remove this peer from the set of candidates + * 4. Repeat till we either run out of peers, or existing peers don't satify any more new + * objectives + * @param peers + * @param objectivesLeft + * @param objectivesMet * @param optimalPeers + * @param random + * @param blockManagerId * @return */ @tailrec - private def getOptimalPeers(peersMap: Map[BlockManagerId, Set[BlockReplicationObjective]], - optimalPeers: Set[BlockManagerId], - objectivesMetSoFar: Set[BlockReplicationObjective], - random: Random): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { - - if (peersMap.isEmpty) { - // we are done here, as no more peers left - (optimalPeers, objectivesMetSoFar) + private def getOptimalPeers(peers: Set[BlockManagerId], + objectivesLeft: Set[BlockReplicationObjective], + objectivesMet: Set[BlockReplicationObjective], + optimalPeers: Set[BlockManagerId], + random: Random, + blockManagerId: BlockManagerId + ): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { + + logDebug(s"Objectives left : ${objectivesLeft.mkString(", ")}") + logDebug(s"Objectives met : ${objectivesMet.mkString(", ")}") + + if (peers.isEmpty) { + // we are done + (optimalPeers, objectivesMet) } else { - val (maxPeer, objectivesMet) = - peersMap.reduce[(BlockManagerId, Set[BlockReplicationObjective])] - {case ((pmax, omax), (p, os)) => - if (omax.size > os.size) { - (pmax, omax) - } else if (omax.size == os.size) { - // size are the same, we randomly choose between the two with equal probability - if (random.nextDouble > 0.5) (pmax, omax) else (p, os) + // we see how the addition of this peer to optimalPeers changes objectives left/met + // ideally, we want a peer whose addition, meets more objectives + // while making sure we still meet objectives met so far + val groupedPeers = peers.groupBy {p => + val peersSet = optimalPeers + p + val allPreviousObjectivesMet = + objectivesMet.forall(_.isObjectiveMet(blockManagerId, peersSet)) + if(allPreviousObjectivesMet) { + objectivesLeft.foldLeft(0) { case (c, o) => + val weight = if (o.isObjectiveMet(blockManagerId, peersSet)) o.weight else 0 + c + weight + } } else { - (p, os) + 0 } } - if(objectivesMet.isEmpty) { + + val (maxCount, maxPeers) = groupedPeers.maxBy {case (k, v) => k} + + logDebug(s"Peers ${maxPeers.mkString(", ")} meet $maxCount objective/s") + + if(maxCount > 0) { + val maxPeer = maxPeers.toSeq(random.nextInt(maxPeers.size)) + val newOptimalPeers = optimalPeers + maxPeer + val newObjectivesMet = + objectivesLeft.filter(_.isObjectiveMet(blockManagerId, newOptimalPeers)) + val newObjectivesLeft = objectivesLeft diff newObjectivesMet + getOptimalPeers( + peers - maxPeer, + newObjectivesLeft, + objectivesMet ++ newObjectivesMet, + newOptimalPeers, + random, + blockManagerId) + } else { // we are done here since either no more objectives left, or // no more peers left that satisfy any objectives - (optimalPeers, objectivesMetSoFar) - } else { - val updatedPeersMap = (peersMap - maxPeer).map {case (peer, objectives) => - peer -> (objectives diff objectivesMet) - } - getOptimalPeers( - updatedPeersMap, - optimalPeers + maxPeer, - objectivesMetSoFar ++ objectivesMet, - random) + (optimalPeers, objectivesMet) } + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 77911620ad6e..979e1f34f0ab 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -76,3 +76,42 @@ class DefaultBlockReplicationPrioritization ret } } + +@DeveloperApi +class PrioritizationWithObjectives + extends BlockReplicationPrioritization + with Logging { + val objectives: Set[BlockReplicationObjective] = Set( + ReplicateToADifferentHost, + ReplicateBlockOutsideRack, + ReplicateBlockWithinRack, + NoTwoReplicasInSameRack + ) + /** + * Method to prioritize a bunch of candidate peers of a block + * + * @param blockManagerId Id of the current BlockManager for self identification + * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + override def prioritize(blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] = { + val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( + objectives, + peers.toSet, + blockId, + blockManagerId + ) + logInfo(s"BlockReplication objectives met : ${objectivesMet.mkString(", ")}") + logInfo(s"Optimal peers : ${optimalPeers.mkString(", ")}") + // outside of the peers, we don't care about the order of peers, so we randomly shuffle + val r = new Random(blockId.hashCode) + val remainingPeers = peers.filter(p => !optimalPeers.contains(p)) + optimalPeers.toSeq ++ r.shuffle(remainingPeers) + } +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 43b121c34812..81e1ea96fbe1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -24,10 +24,11 @@ import org.scalatest.Matchers import org.apache.spark.SparkFunSuite class BlockReplicationObjectives extends SparkFunSuite with Matchers { - val objectives = Set( + val objectives: Set[BlockReplicationObjective] = Set( ReplicateToADifferentHost, ReplicateBlockOutsideRack, - ReplicateBlockWithinRack + ReplicateBlockWithinRack, + NoTwoReplicasInSameRack ) test("peers are all in the same rack") { @@ -49,8 +50,9 @@ class BlockReplicationObjectives extends SparkFunSuite with Matchers { } test("peers in 3 racks") { - val blockManagerIds = generateBlockManagerIds(10, List("/Rack1", "/Rack2", "/Rack3")) - val candidateBMId = generateBlockManagerIds(1, List("/Rack1", "/Rack2", "/Rack3")).head + val racks = List("/Rack1", "/Rack2", "/Rack3") + val blockManagerIds = generateBlockManagerIds(10, racks) + val candidateBMId = generateBlockManagerIds(1, racks).head val blockId = BlockId("test_block") val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( objectives, @@ -61,7 +63,7 @@ class BlockReplicationObjectives extends SparkFunSuite with Matchers { logInfo(s"Optimal peers : ${optimalPeers}") logInfo(s"Objectives met : ${objectivesMet}") assert(optimalPeers.size == 2) - assert(objectivesMet.size == 3) + assert(objectivesMet.size == 4) assert(objectives.forall(_.isObjectiveMet(candidateBMId, optimalPeers))) } From 4cd86db32b44bae6d53d96807fdb9f71008cd81d Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 16 Jun 2016 18:45:41 -0400 Subject: [PATCH 19/31] Making sure we consider peers we have previously replicated to. --- .../storage/BlockReplicationObjectives.scala | 31 +++++++++++++------ .../BlockReplicationPrioritization.scala | 1 + .../storage/BlockReplicationObjectives.scala | 2 ++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 1b7d59b9b2ef..354ab33f9b36 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -71,14 +71,27 @@ case object NoTwoReplicasInSameRack } object BlockReplicationOptimizer extends Logging { + + /** + * Get a minimal set of peers that meet the objectives. This is a "best-effort" implementation. + * + * @param objectives set of block replication objectives + * @param peers set of candidate peers + * @param peersReplicatedTo set of peers we have already replicated to. Empty set if no + * replicas so far + * @param blockId block Id of the block being replicated, as a source of randomness + * @param blockManagerId current blockManagerId, so we know where we are + * @return a tuple of set of optimal peers, and the objectives satisfied by the peers. + * Since this is a best-effort implemenation, all objectives might have been met. + */ def getPeersToMeetObjectives(objectives: Set[BlockReplicationObjective], peers: Set[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], blockId: BlockId, blockManagerId: BlockManagerId): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { - val peersMap = - peers.map(p => p -> objectives.filter(_.isObjectiveMet(blockManagerId, Set(p)))).toMap + val random = new Random(blockId.hashCode) - getOptimalPeers(peers, objectives, Set.empty, Set.empty, random, blockManagerId) + getOptimalPeers(peers, objectives, Set.empty, peersReplicatedTo, random, blockManagerId) } /** @@ -99,12 +112,12 @@ object BlockReplicationOptimizer extends Logging { */ @tailrec private def getOptimalPeers(peers: Set[BlockManagerId], - objectivesLeft: Set[BlockReplicationObjective], - objectivesMet: Set[BlockReplicationObjective], - optimalPeers: Set[BlockManagerId], - random: Random, - blockManagerId: BlockManagerId - ): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { + objectivesLeft: Set[BlockReplicationObjective], + objectivesMet: Set[BlockReplicationObjective], + optimalPeers: Set[BlockManagerId], + random: Random, + blockManagerId: BlockManagerId + ): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { logDebug(s"Objectives left : ${objectivesLeft.mkString(", ")}") logDebug(s"Objectives met : ${objectivesMet.mkString(", ")}") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 979e1f34f0ab..54adfab7f3aa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -104,6 +104,7 @@ class PrioritizationWithObjectives val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( objectives, peers.toSet, + peersReplicatedTo, blockId, blockManagerId ) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 81e1ea96fbe1..8b7cbaf0de62 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -41,6 +41,7 @@ class BlockReplicationObjectives extends SparkFunSuite with Matchers { val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( objectives, blockManagerIds, + Set.empty, blockId, candidateBMId) @@ -57,6 +58,7 @@ class BlockReplicationObjectives extends SparkFunSuite with Matchers { val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( objectives, blockManagerIds, + Set.empty, blockId, candidateBMId) From 69cfc458f1e828a2456f53982adaf3be68356c3c Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 24 Jun 2016 18:02:11 -0400 Subject: [PATCH 20/31] 1. Fixing topology mapper class issue, so we instantiate it correctly. 2. Fixing style issues --- .../org/apache/spark/storage/BlockReplicationObjectives.scala | 4 ++-- .../apache/spark/storage/BlockReplicationPrioritization.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 354ab33f9b36..570284a0c2ba 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -17,11 +17,11 @@ package org.apache.spark.storage -import org.apache.spark.internal.Logging - import scala.annotation.tailrec import scala.util.Random +import org.apache.spark.internal.Logging + trait BlockReplicationObjective { val weight = 1 def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 54adfab7f3aa..83c7df38eefb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -115,4 +115,4 @@ class PrioritizationWithObjectives val remainingPeers = peers.filter(p => !optimalPeers.contains(p)) optimalPeers.toSeq ++ r.shuffle(remainingPeers) } -} \ No newline at end of file +} From 265a24e4b0b19c8fea2b69825f31a96be0a79bbe Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 12 Jul 2016 16:53:32 -0400 Subject: [PATCH 21/31] Adding an assertion in test case. --- .../org/apache/spark/storage/BlockReplicationObjectives.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 8b7cbaf0de62..058eee953937 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -48,6 +48,7 @@ class BlockReplicationObjectives extends SparkFunSuite with Matchers { logInfo(s"Optimal peers : ${optimalPeers}") logInfo(s"Objectives met : ${objectivesMet}") assert(optimalPeers.size == 1) + assert(objectivesMet.size == 3) } test("peers in 3 racks") { From 30ba3e920945dc5025cce377f20a25013b3df7ac Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 18 Jul 2016 11:03:34 -0400 Subject: [PATCH 22/31] Searching for the set of peer that meet most new objectives is optimized. --- .../storage/BlockReplicationObjectives.scala | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 570284a0c2ba..16d3d8ad2cca 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -129,22 +129,32 @@ object BlockReplicationOptimizer extends Logging { // we see how the addition of this peer to optimalPeers changes objectives left/met // ideally, we want a peer whose addition, meets more objectives // while making sure we still meet objectives met so far - val groupedPeers = peers.groupBy {p => - val peersSet = optimalPeers + p - val allPreviousObjectivesMet = - objectivesMet.forall(_.isObjectiveMet(blockManagerId, peersSet)) - if(allPreviousObjectivesMet) { - objectivesLeft.foldLeft(0) { case (c, o) => - val weight = if (o.isObjectiveMet(blockManagerId, peersSet)) o.weight else 0 - c + weight + + val (maxCount, maxPeers) = peers.foldLeft((0, Set.empty[BlockManagerId])) { + case ((prevMax, maxSet), peer) => + val peersSet = optimalPeers + peer + val allPreviousObjectivesMet = + objectivesMet.forall(_.isObjectiveMet(blockManagerId, peersSet)) + val score = if (allPreviousObjectivesMet) { + objectivesLeft.foldLeft(0) { case (c, o) => + val weight = if (o.isObjectiveMet(blockManagerId, peersSet)) o.weight else 0 + c + weight + } + } else { + 0 + } + if (score > prevMax) { + // we found a peer that gets us a higher score! + (score, Set(peer)) + } else if (score == prevMax) { + // this peer matches our highest score so far, add this and continue + (prevMax, maxSet + peer) + } else { + // this peer scores lower, we ignore it + (prevMax, maxSet) } - } else { - 0 - } } - val (maxCount, maxPeers) = groupedPeers.maxBy {case (k, v) => k} - logDebug(s"Peers ${maxPeers.mkString(", ")} meet $maxCount objective/s") if(maxCount > 0) { From 0abdb3ddd0d9fd599dad5a51a70e73d0392dcd3c Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 19 Jul 2016 13:32:21 -0400 Subject: [PATCH 23/31] Adding a basic HDFS like block replication strategy. Re-using BlockManagerReplicationSuite, to also run the same set of tests when using the basic strategy. Added a couple of specific test cases to verify prioritization. --- .../BlockReplicationPrioritization.scala | 76 ++++++++++++++++++- .../BlockManagerReplicationSuite.scala | 34 ++++++++- .../BlockReplicationStrategySuite.scala | 75 ++++++++++++++++++ 3 files changed, 179 insertions(+), 6 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala index 83c7df38eefb..b4b4f431af54 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging trait BlockReplicationPrioritization { /** - * Method to prioritize a bunch of candidate peers of a block + * Method to prioritize a bunch of candidate peers of a block manager * * @param blockManagerId Id of the current BlockManager for self identification * @param peers A list of peers of a BlockManager @@ -53,7 +53,7 @@ class DefaultBlockReplicationPrioritization with Logging { /** - * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation, + * Method to prioritize a bunch of candidate peers of a block manager. This is an implementation * that just makes sure we put blocks on different hosts, if possible * * @param blockManagerId Id of the current BlockManager for self identification @@ -80,7 +80,7 @@ class DefaultBlockReplicationPrioritization @DeveloperApi class PrioritizationWithObjectives extends BlockReplicationPrioritization - with Logging { + with Logging { val objectives: Set[BlockReplicationObjective] = Set( ReplicateToADifferentHost, ReplicateBlockOutsideRack, @@ -97,7 +97,8 @@ class PrioritizationWithObjectives * randomness if needed. * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - override def prioritize(blockManagerId: BlockManagerId, + override def prioritize( + blockManagerId: BlockManagerId, peers: Seq[BlockManagerId], peersReplicatedTo: Set[BlockManagerId], blockId: BlockId): Seq[BlockManagerId] = { @@ -116,3 +117,70 @@ class PrioritizationWithObjectives optimalPeers.toSeq ++ r.shuffle(remainingPeers) } } + +@DeveloperApi +class BasicBlockReplicationPrioritization + extends BlockReplicationPrioritization + with Logging { + + /** + * Method to prioritize a bunch of candidate peers of a block manager. This implementation + * replicates the behavior of block replication in HDFS, a peer is chosen within the rack, + * one outside and that's it. This works best with a total replication factor of 3. + * + * @param blockManagerId Id of the current BlockManager for self identification + * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + override def prioritize( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: Set[BlockManagerId], + blockId: BlockId): Seq[BlockManagerId] = { + + logDebug(s"Input peers : $peers") + logDebug(s"BlockManagerId : $blockManagerId") + + val random = new Random(blockId.hashCode) + + val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo) + val peerWithinRack = if (doneWithinRack) { + // we are done with in-rack replication, so don't need anymore peers + Seq.empty[BlockManagerId] + } else { + // we choose an in-rack peer at random + val inRackPeers = peers.filter { p => + // we try to get peers within the same rack, but not the current host + p.topologyInfo == blockManagerId.topologyInfo && p.host != blockManagerId.host + } + + if(inRackPeers.isEmpty) { + Seq.empty + } else { + Seq(inRackPeers(random.nextInt(inRackPeers.size))) + } + } + val doneOutsideRack = peersReplicatedTo.exists(_.topologyInfo != blockManagerId.topologyInfo) + + val peerOutsideRack = if (doneOutsideRack) { + Seq.empty[BlockManagerId] + } else { + val outOfRackPeers = peers.filter(_.topologyInfo != blockManagerId.topologyInfo) + if(outOfRackPeers.isEmpty) { + Seq.empty + } else { + Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size))) + } + } + + val priorityPeers = peerWithinRack ++ peerOutsideRack + + logInfo(s"Priority peers : $priorityPeers") + val remainingPeers = random.shuffle((peers.toSet diff priorityPeers.toSet).toSeq) + + priorityPeers ++ remainingPeers + } +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index d5715f8469f7..131ce7c9b23a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.internal.Logging import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService @@ -36,6 +37,8 @@ import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ +import org.apache.spark.util.Utils + trait BlockManagerReplicationBehavior extends SparkFunSuite with Matchers @@ -43,6 +46,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite with LocalSparkContext { val conf: SparkConf + protected var rpcEnv: RpcEnv = null protected var master: BlockManagerMaster = null protected lazy val securityMgr = new SecurityManager(conf) @@ -55,7 +59,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite protected val allStores = new ArrayBuffer[BlockManager] // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test - protected lazy val serializer = new KryoSerializer(conf) // Implicitly convert strings to BlockIds for test clarity. @@ -471,7 +474,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav conf.set("spark.storage.replication.proactive", "true") conf.set("spark.storage.exceptionOnPinLeak", "true") - (2 to 5).foreach{ i => + (2 to 5).foreach { i => test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") { testProactiveReplication(i) } @@ -524,3 +527,30 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav } } } + +class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { + // number of racks to test with + val numRacks = 3 + + /** + * Gets the topology information given the host name + * + * @param hostname Hostname + * @return random topology + */ + override def getTopologyForHost(hostname: String): Option[String] = { + Some(s"/Rack-${Utils.random.nextInt(numRacks)}") + } +} + +class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior { + val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") + conf.set("spark.kryoserializer.buffer", "1m") + conf.set( + "spark.replication.topologyawareness.prioritizer", + classOf[BasicBlockReplicationPrioritization].getName) + conf.set( + "spark.replication.topologyawareness.topologyMapper", + classOf[DummyTopologyMapper].getName) + +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala new file mode 100644 index 000000000000..4aad5bd4c301 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.util.Random + +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite + +class BlockReplicationStrategySuite extends SparkFunSuite with Matchers { + val prioritizer = new BasicBlockReplicationPrioritization + val blockId = BlockId("test_block") + + test("All peers in the same rack") { + val racks = Seq("/default-rack") + + val peers = generateBlockManagerIds(10, racks) + val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head)) + + val prioritizedPeers = prioritizer.prioritize( + blockManager, + peers.toSeq, + Set.empty, + blockId + ) + + assert(prioritizedPeers.size == peers.size) + val priorityPeer = prioritizedPeers.head + assert(priorityPeer.host != blockManager.host) + } + + test("Peers in 2 racks") { + val racks = Seq("/Rack-1", "/Rack-2") + + val peers = generateBlockManagerIds(10, racks) + val blockManager = BlockManagerId("Exec-1", "Host-1", 9001, Some(racks.head)) + + val prioritizedPeers = prioritizer.prioritize( + blockManager, + peers.toSeq, + Set.empty, + blockId + ) + + assert(prioritizedPeers.size == peers.size) + val priorityPeers = prioritizedPeers.take(2) + assert(priorityPeers.forall(p => p.host != blockManager.host)) + assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo)) + assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo)) + } + + + private def generateBlockManagerIds(count: Int, racks: Seq[String]): Set[BlockManagerId] = { + (1 to count).map{i => + BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size)))) + }.toSet + } + +} From 0a892f68fab2e73348f0e0e4d76f2368d4b0d2d9 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 22 Jul 2016 13:37:48 -0400 Subject: [PATCH 24/31] Running the suite for PrioritizerStrategy as well. --- .../spark/storage/BlockManagerReplicationSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 131ce7c9b23a..07f46827a706 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -554,3 +554,15 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB classOf[DummyTopologyMapper].getName) } + +class BlockManagerPrioritizerReplicationSuite extends BlockManagerReplicationBehavior { + override val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") + conf.set("spark.kryoserializer.buffer", "1m") + conf.set( + "spark.replication.topologyawareness.prioritizer", + "org.apache.spark.storage.PrioritizationWithObjectives") + conf.set( + "spark.replication.topologyawareness.topologyMapper", + "org.apache.spark.storage.DummyTopologyMapper" + ) +} From 485c9a1633cfd1b00b7b6186f2bf1950ea34e54e Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 22 Jul 2016 13:42:52 -0400 Subject: [PATCH 25/31] Renaming to follow naming convention. --- ...onObjectives.scala => BlockReplicationObjectivesSuite.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename core/src/test/scala/org/apache/spark/storage/{BlockReplicationObjectives.scala => BlockReplicationObjectivesSuite.scala} (97%) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala similarity index 97% rename from core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala rename to core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala index 058eee953937..c24e53e319f0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.Matchers import org.apache.spark.SparkFunSuite -class BlockReplicationObjectives extends SparkFunSuite with Matchers { +class BlockReplicationObjectivesSuite extends SparkFunSuite with Matchers { val objectives: Set[BlockReplicationObjective] = Set( ReplicateToADifferentHost, ReplicateBlockOutsideRack, From 88d60e272a520dc1ed1c5c89b815db5c846eea37 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 1 Nov 2016 15:14:26 -0400 Subject: [PATCH 26/31] Rebasing to master. Adding two strategies with numReplicas constraint, along with test cases. --- .../storage/BlockReplicationObjectives.scala | 99 +++++---- .../storage/BlockReplicationPolicy.scala | 198 ++++++++++++++++-- .../BlockReplicationPrioritization.scala | 186 ---------------- .../BlockManagerReplicationSuite.scala | 13 +- .../BlockReplicationObjectivesSuite.scala | 78 ------- .../storage/BlockReplicationPolicySuite.scala | 141 +++++++++++-- .../BlockReplicationStrategySuite.scala | 75 ------- 7 files changed, 375 insertions(+), 415 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala delete mode 100644 core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala delete mode 100644 core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala index 16d3d8ad2cca..0909e393d68b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala @@ -18,20 +18,23 @@ package org.apache.spark.storage import scala.annotation.tailrec +import scala.collection.mutable import scala.util.Random import org.apache.spark.internal.Logging + trait BlockReplicationObjective { val weight = 1 - def isObjectiveMet(blockManagerId: BlockManagerId, peers: Set[BlockManagerId]): Boolean + def isObjectiveMet(blockManagerId: BlockManagerId, peers: Seq[BlockManagerId]): Boolean } case object ReplicateToADifferentHost extends BlockReplicationObjective { - override def isObjectiveMet(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Boolean = { + override def isObjectiveMet( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId]): Boolean = { peers.exists(_.host != blockManagerId.host) } } @@ -39,32 +42,36 @@ case object ReplicateToADifferentHost case object ReplicateBlockWithinRack extends BlockReplicationObjective { - override def isObjectiveMet(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Boolean = { + override def isObjectiveMet( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId]): Boolean = { peers.exists(_.topologyInfo == blockManagerId.topologyInfo) } } case object ReplicateBlockOutsideRack extends BlockReplicationObjective { - override def isObjectiveMet(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Boolean = { + override def isObjectiveMet( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId]): Boolean = { peers.exists(_.topologyInfo != blockManagerId.topologyInfo) } } case object RandomlyReplicateBlock extends BlockReplicationObjective { - override def isObjectiveMet(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Boolean = { + override def isObjectiveMet( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId]): Boolean = { peers.nonEmpty } } case object NoTwoReplicasInSameRack extends BlockReplicationObjective { - override def isObjectiveMet(blockManagerId: BlockManagerId, - peers: Set[BlockManagerId]): Boolean = { + override def isObjectiveMet( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId]): Boolean = { val racksReplicatedTo = peers.map(_.topologyInfo).toSet.size (peers.size == racksReplicatedTo) } @@ -81,17 +88,28 @@ object BlockReplicationOptimizer extends Logging { * replicas so far * @param blockId block Id of the block being replicated, as a source of randomness * @param blockManagerId current blockManagerId, so we know where we are + * @param numReplicas Number of peers we need to replicate to * @return a tuple of set of optimal peers, and the objectives satisfied by the peers. * Since this is a best-effort implemenation, all objectives might have been met. */ - def getPeersToMeetObjectives(objectives: Set[BlockReplicationObjective], - peers: Set[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId, - blockManagerId: BlockManagerId): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { + def getPeersToMeetObjectives( + objectives: Set[BlockReplicationObjective], + peers: Seq[BlockManagerId], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + blockId: BlockId, + blockManagerId: BlockManagerId, + numReplicas: Int): (Seq[BlockManagerId], Set[BlockReplicationObjective]) = { val random = new Random(blockId.hashCode) - getOptimalPeers(peers, objectives, Set.empty, peersReplicatedTo, random, blockManagerId) + getOptimalPeers( + peers.toSet, + objectives, + Set.empty, + peersReplicatedTo, + Seq.empty, + random, + blockManagerId, + numReplicas) } /** @@ -101,7 +119,7 @@ object BlockReplicationOptimizer extends Logging { * while making sure any previously satisfied objectives are still satisfied. * 3. Once chosen, we remove this peer from the set of candidates * 4. Repeat till we either run out of peers, or existing peers don't satify any more new - * objectives + * objectives or we have met our numReplicas target * @param peers * @param objectivesLeft * @param objectivesMet @@ -111,18 +129,20 @@ object BlockReplicationOptimizer extends Logging { * @return */ @tailrec - private def getOptimalPeers(peers: Set[BlockManagerId], - objectivesLeft: Set[BlockReplicationObjective], - objectivesMet: Set[BlockReplicationObjective], - optimalPeers: Set[BlockManagerId], - random: Random, - blockManagerId: BlockManagerId - ): (Set[BlockManagerId], Set[BlockReplicationObjective]) = { + private def getOptimalPeers( + peers: Set[BlockManagerId], + objectivesLeft: Set[BlockReplicationObjective], + objectivesMet: Set[BlockReplicationObjective], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + optimalPeers: Seq[BlockManagerId], + random: Random, + blockManagerId: BlockManagerId, + numReplicas: Int): (Seq[BlockManagerId], Set[BlockReplicationObjective]) = { logDebug(s"Objectives left : ${objectivesLeft.mkString(", ")}") logDebug(s"Objectives met : ${objectivesMet.mkString(", ")}") - if (peers.isEmpty) { + if (peers.isEmpty || optimalPeers.size == numReplicas) { // we are done (optimalPeers, objectivesMet) } else { @@ -130,36 +150,35 @@ object BlockReplicationOptimizer extends Logging { // ideally, we want a peer whose addition, meets more objectives // while making sure we still meet objectives met so far - val (maxCount, maxPeers) = peers.foldLeft((0, Set.empty[BlockManagerId])) { - case ((prevMax, maxSet), peer) => - val peersSet = optimalPeers + peer + val (maxCount, maxPeers) = peers.foldLeft((0, Seq.empty[BlockManagerId])) { + case ((prevMax, maxSeq), peer) => + val peersSet = peer +: (optimalPeers ++ peersReplicatedTo) val allPreviousObjectivesMet = objectivesMet.forall(_.isObjectiveMet(blockManagerId, peersSet)) val score = if (allPreviousObjectivesMet) { - objectivesLeft.foldLeft(0) { case (c, o) => - val weight = if (o.isObjectiveMet(blockManagerId, peersSet)) o.weight else 0 - c + weight - } + objectivesLeft.map{o => + if (o.isObjectiveMet(blockManagerId, peersSet)) o.weight else 0 + }.sum } else { 0 } if (score > prevMax) { // we found a peer that gets us a higher score! - (score, Set(peer)) + (score, Seq(peer)) } else if (score == prevMax) { // this peer matches our highest score so far, add this and continue - (prevMax, maxSet + peer) + (prevMax, peer +: maxSeq) } else { // this peer scores lower, we ignore it - (prevMax, maxSet) + (prevMax, maxSeq) } } logDebug(s"Peers ${maxPeers.mkString(", ")} meet $maxCount objective/s") if(maxCount > 0) { - val maxPeer = maxPeers.toSeq(random.nextInt(maxPeers.size)) - val newOptimalPeers = optimalPeers + maxPeer + val maxPeer = maxPeers(random.nextInt(maxPeers.size)) + val newOptimalPeers = optimalPeers :+ maxPeer val newObjectivesMet = objectivesLeft.filter(_.isObjectiveMet(blockManagerId, newOptimalPeers)) val newObjectivesLeft = objectivesLeft diff newObjectivesMet @@ -167,9 +186,11 @@ object BlockReplicationOptimizer extends Logging { peers - maxPeer, newObjectivesLeft, objectivesMet ++ newObjectivesMet, + peersReplicatedTo, newOptimalPeers, random, - blockManagerId) + blockManagerId, + numReplicas) } else { // we are done here since either no more objectives left, or // no more peers left that satisfy any objectives diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala index bb8a684b4c7a..c5ffdb793b0c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala @@ -53,6 +53,48 @@ trait BlockReplicationPolicy { numReplicas: Int): List[BlockManagerId] } +object BlockReplicationUtils { + // scalastyle:off line.size.limit + /** + * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while + * minimizing space usage. Please see + * here. + * + * @param n total number of indices + * @param m number of samples needed + * @param r random number generator + * @return list of m random unique indices + */ + // scalastyle:on line.size.limit + private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = { + val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) => + val t = r.nextInt(i) + 1 + if (set.contains(t)) set + i else set + t + } + // we shuffle the result to ensure a random arrangement within the sample + // to avoid any bias from set implementations + r.shuffle(indices.map(_ - 1).toList) + } + + /** + * Get a random sample of size m from the elems + * + * @param elems + * @param m number of samples needed + * @param r random number generator + * @tparam T + * @return a random list of size m. If there are fewer than m elements in elems, we just + * randomly shuffle elems + */ + def getRandomSample[T](elems: Seq[T], m: Int, r: Random): List[T] = { + if (elems.size > m) { + getSampleIds(elems.size, m, r).map(elems(_)) + } else { + r.shuffle(elems).toList + } + } +} + @DeveloperApi class RandomBlockReplicationPolicy extends BlockReplicationPolicy @@ -67,6 +109,7 @@ class RandomBlockReplicationPolicy * @param peersReplicatedTo Set of peers already replicated to * @param blockId BlockId of the block being replicated. This can be used as a source of * randomness if needed. + * @param numReplicas Number of peers we need to replicate to * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ override def prioritize( @@ -78,7 +121,7 @@ class RandomBlockReplicationPolicy val random = new Random(blockId.hashCode) logDebug(s"Input peers : ${peers.mkString(", ")}") val prioritizedPeers = if (peers.size > numReplicas) { - getSampleIds(peers.size, numReplicas, random).map(peers(_)) + BlockReplicationUtils.getRandomSample(peers, numReplicas, random) } else { if (peers.size < numReplicas) { logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.") @@ -88,26 +131,147 @@ class RandomBlockReplicationPolicy logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}") prioritizedPeers } +} + +@DeveloperApi +class BasicBlockReplicationPolicy + extends BlockReplicationPolicy + with Logging { - // scalastyle:off line.size.limit /** - * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while - * minimizing space usage. Please see - * here. + * Method to prioritize a bunch of candidate peers of a block manager. This implementation + * replicates the behavior of block replication in HDFS, a peer is chosen within the rack, + * one outside and that's it. This works best with a total replication factor of 3. * - * @param n total number of indices - * @param m number of samples needed - * @param r random number generator - * @return list of m random unique indices + * @param blockManagerId Id of the current BlockManager for self identification + * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. + * @param numReplicas Number of peers we need to replicate to + * @return A prioritized list of peers. Lower the index of a peer, higher its priority */ - // scalastyle:on line.size.limit - private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = { - val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) => - val t = r.nextInt(i) + 1 - if (set.contains(t)) set + i else set + t + override def prioritize( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + blockId: BlockId, + numReplicas: Int): List[BlockManagerId] = { + + logDebug(s"Input peers : $peers") + logDebug(s"BlockManagerId : $blockManagerId") + + val random = new Random(blockId.hashCode) + + // if block doesn't have topology info, we can't do much, so we randlomly shuffle + // if there is, we see what's needed from peersReplicatedTo and based on numReplicas, + // we choose whats needed + if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) { + // no topology info for the block. The best we can do is randomly choose peers + BlockReplicationUtils.getRandomSample(peers, numReplicas, random) + } else { + // we have topology information, we see what is left to be done from peersReplicatedTo + val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo) + val doneOutsideRack = peersReplicatedTo.exists { p => + p.topologyInfo.isDefined && p.topologyInfo != blockManagerId.topologyInfo + } + + if (doneOutsideRack && doneWithinRack) { + // we are done, we just return a random sample + BlockReplicationUtils.getRandomSample(peers, numReplicas, random) + } else { + // we separate peers within and outside rack + val (inRackPeers, outOfRackPeers) = peers + .filter(_.host != blockManagerId.host) + .partition(_.topologyInfo == blockManagerId.topologyInfo) + + val peerWithinRack = if (doneWithinRack) { + // we are done with in-rack replication, so don't need anymore peers + Seq.empty + } else { + if (inRackPeers.isEmpty) { + Seq.empty + } else { + Seq(inRackPeers(random.nextInt(inRackPeers.size))) + } + } + + val peerOutsideRack = if (doneOutsideRack || numReplicas - peerWithinRack.size <= 0) { + Seq.empty + } else { + if (outOfRackPeers.isEmpty) { + Seq.empty + } else { + Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size))) + } + } + + val priorityPeers = peerWithinRack ++ peerOutsideRack + val numRemainingPeers = numReplicas - priorityPeers.size + val remainingPeers = if (numRemainingPeers > 0) { + val rPeers = peers.filter(p => !priorityPeers.contains(p)) + BlockReplicationUtils.getRandomSample(rPeers, numRemainingPeers, random) + } else { + Seq.empty + } + + (priorityPeers ++ remainingPeers).toList + } + } - // we shuffle the result to ensure a random arrangement within the sample - // to avoid any bias from set implementations - r.shuffle(indices.map(_ - 1).toList) + } + +} + +@DeveloperApi +class ObjectivesBasedReplicationPolicy + extends BlockReplicationPolicy + with Logging { + val objectives: Set[BlockReplicationObjective] = Set( + ReplicateToADifferentHost, + ReplicateBlockOutsideRack, + ReplicateBlockWithinRack, + NoTwoReplicasInSameRack + ) + + /** + * Method to prioritize a bunch of candidate peers of a block + * + * @param blockManagerId Id of the current BlockManager for self identification + * @param peers A list of peers of a BlockManager + * @param peersReplicatedTo Set of peers already replicated to + * @param blockId BlockId of the block being replicated. This can be used as a source of + * randomness if needed. + * @param numReplicas Number of peers we need to replicate to + * @return A prioritized list of peers. Lower the index of a peer, higher its priority + */ + override def prioritize( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + blockId: BlockId, + numReplicas: Int): List[BlockManagerId] = { + val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( + objectives, + peers, + peersReplicatedTo, + blockId, + blockManagerId, + numReplicas + ) + logDebug(s"BlockReplication objectives met : ${objectivesMet.mkString(", ")}") + logDebug(s"Optimal peers : ${optimalPeers.mkString(", ")}") + // pad optimal peers with random peers if we are short + val numRemainingPeers = numReplicas - optimalPeers.size + val remainingPeers = if (numRemainingPeers > 0) { + val r = new Random(blockId.hashCode) + BlockReplicationUtils.getRandomSample( + peers.filter(p => !optimalPeers.contains(p)), + numRemainingPeers, + r) + } else { + List.empty + } + optimalPeers.toList ++ remainingPeers } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala deleted file mode 100644 index b4b4f431af54..000000000000 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import scala.util.Random - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging - -/** - * ::DeveloperApi:: - * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for - * replicating blocks - */ -@DeveloperApi -trait BlockReplicationPrioritization { - - /** - * Method to prioritize a bunch of candidate peers of a block manager - * - * @param blockManagerId Id of the current BlockManager for self identification - * @param peers A list of peers of a BlockManager - * @param peersReplicatedTo Set of peers already replicated to - * @param blockId BlockId of the block being replicated. This can be used as a source of - * randomness if needed. - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ - def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] -} - -@DeveloperApi -class DefaultBlockReplicationPrioritization - extends BlockReplicationPrioritization - with Logging { - - /** - * Method to prioritize a bunch of candidate peers of a block manager. This is an implementation - * that just makes sure we put blocks on different hosts, if possible - * - * @param blockManagerId Id of the current BlockManager for self identification - * @param peers A list of peers of a BlockManager - * @param peersReplicatedTo Set of peers already replicated to - * @param blockId BlockId of the block being replicated. This can be used as a source of - * randomness if needed. - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ - override def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] = { - val random = new Random(blockId.hashCode) - - logDebug(s"Input peers : ${peers.mkString(", ")}") - val ret = random.shuffle(peers) - logDebug(s"Prioritized peers : ${ret.mkString(", ")}") - ret - } -} - -@DeveloperApi -class PrioritizationWithObjectives - extends BlockReplicationPrioritization - with Logging { - val objectives: Set[BlockReplicationObjective] = Set( - ReplicateToADifferentHost, - ReplicateBlockOutsideRack, - ReplicateBlockWithinRack, - NoTwoReplicasInSameRack - ) - /** - * Method to prioritize a bunch of candidate peers of a block - * - * @param blockManagerId Id of the current BlockManager for self identification - * @param peers A list of peers of a BlockManager - * @param peersReplicatedTo Set of peers already replicated to - * @param blockId BlockId of the block being replicated. This can be used as a source of - * randomness if needed. - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ - override def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] = { - val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( - objectives, - peers.toSet, - peersReplicatedTo, - blockId, - blockManagerId - ) - logInfo(s"BlockReplication objectives met : ${objectivesMet.mkString(", ")}") - logInfo(s"Optimal peers : ${optimalPeers.mkString(", ")}") - // outside of the peers, we don't care about the order of peers, so we randomly shuffle - val r = new Random(blockId.hashCode) - val remainingPeers = peers.filter(p => !optimalPeers.contains(p)) - optimalPeers.toSeq ++ r.shuffle(remainingPeers) - } -} - -@DeveloperApi -class BasicBlockReplicationPrioritization - extends BlockReplicationPrioritization - with Logging { - - /** - * Method to prioritize a bunch of candidate peers of a block manager. This implementation - * replicates the behavior of block replication in HDFS, a peer is chosen within the rack, - * one outside and that's it. This works best with a total replication factor of 3. - * - * @param blockManagerId Id of the current BlockManager for self identification - * @param peers A list of peers of a BlockManager - * @param peersReplicatedTo Set of peers already replicated to - * @param blockId BlockId of the block being replicated. This can be used as a source of - * randomness if needed. - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ - override def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: Set[BlockManagerId], - blockId: BlockId): Seq[BlockManagerId] = { - - logDebug(s"Input peers : $peers") - logDebug(s"BlockManagerId : $blockManagerId") - - val random = new Random(blockId.hashCode) - - val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo) - val peerWithinRack = if (doneWithinRack) { - // we are done with in-rack replication, so don't need anymore peers - Seq.empty[BlockManagerId] - } else { - // we choose an in-rack peer at random - val inRackPeers = peers.filter { p => - // we try to get peers within the same rack, but not the current host - p.topologyInfo == blockManagerId.topologyInfo && p.host != blockManagerId.host - } - - if(inRackPeers.isEmpty) { - Seq.empty - } else { - Seq(inRackPeers(random.nextInt(inRackPeers.size))) - } - } - val doneOutsideRack = peersReplicatedTo.exists(_.topologyInfo != blockManagerId.topologyInfo) - - val peerOutsideRack = if (doneOutsideRack) { - Seq.empty[BlockManagerId] - } else { - val outOfRackPeers = peers.filter(_.topologyInfo != blockManagerId.topologyInfo) - if(outOfRackPeers.isEmpty) { - Seq.empty - } else { - Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size))) - } - } - - val priorityPeers = peerWithinRack ++ peerOutsideRack - - logInfo(s"Priority peers : $priorityPeers") - val remainingPeers = random.shuffle((peers.toSet diff priorityPeers.toSet).toSeq) - - priorityPeers ++ remainingPeers - } -} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 07f46827a706..d87730012964 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -547,12 +547,11 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") conf.set("spark.kryoserializer.buffer", "1m") conf.set( - "spark.replication.topologyawareness.prioritizer", - classOf[BasicBlockReplicationPrioritization].getName) + "spark.storage.replication.policy", + classOf[BasicBlockReplicationPolicy].getName) conf.set( - "spark.replication.topologyawareness.topologyMapper", + "spark.storage.replication.topologyMapper", classOf[DummyTopologyMapper].getName) - } class BlockManagerPrioritizerReplicationSuite extends BlockManagerReplicationBehavior { @@ -560,9 +559,9 @@ class BlockManagerPrioritizerReplicationSuite extends BlockManagerReplicationBeh conf.set("spark.kryoserializer.buffer", "1m") conf.set( "spark.replication.topologyawareness.prioritizer", - "org.apache.spark.storage.PrioritizationWithObjectives") + classOf[ObjectivesBasedReplicationPolicy].getName) conf.set( - "spark.replication.topologyawareness.topologyMapper", - "org.apache.spark.storage.DummyTopologyMapper" + "spark.storage.replication.topologyMapper", + classOf[DummyTopologyMapper].getName ) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala deleted file mode 100644 index c24e53e319f0..000000000000 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationObjectivesSuite.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import scala.util.Random - -import org.scalatest.Matchers - -import org.apache.spark.SparkFunSuite - -class BlockReplicationObjectivesSuite extends SparkFunSuite with Matchers { - val objectives: Set[BlockReplicationObjective] = Set( - ReplicateToADifferentHost, - ReplicateBlockOutsideRack, - ReplicateBlockWithinRack, - NoTwoReplicasInSameRack - ) - - test("peers are all in the same rack") { - val blockManagerIds = generateBlockManagerIds(10, List("Default-rack")) - - val blockId = BlockId("test_block") - - val candidateBMId = generateBlockManagerIds(1, List("Default-rack")).head - - val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( - objectives, - blockManagerIds, - Set.empty, - blockId, - candidateBMId) - - logInfo(s"Optimal peers : ${optimalPeers}") - logInfo(s"Objectives met : ${objectivesMet}") - assert(optimalPeers.size == 1) - assert(objectivesMet.size == 3) - } - - test("peers in 3 racks") { - val racks = List("/Rack1", "/Rack2", "/Rack3") - val blockManagerIds = generateBlockManagerIds(10, racks) - val candidateBMId = generateBlockManagerIds(1, racks).head - val blockId = BlockId("test_block") - val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( - objectives, - blockManagerIds, - Set.empty, - blockId, - candidateBMId) - - logInfo(s"Optimal peers : ${optimalPeers}") - logInfo(s"Objectives met : ${objectivesMet}") - assert(optimalPeers.size == 2) - assert(objectivesMet.size == 4) - assert(objectives.forall(_.isObjectiveMet(candidateBMId, optimalPeers))) - } - - private def generateBlockManagerIds(count: Int, racks: Seq[String]): Set[BlockManagerId] = { - (1 to count).map{i => - BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size)))) - }.toSet - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala index 800c3899f1a7..a4cd8c7afd0b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala @@ -18,34 +18,34 @@ package org.apache.spark.storage import scala.collection.mutable +import scala.util.Random import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.{LocalSparkContext, SparkFunSuite} -class BlockReplicationPolicySuite extends SparkFunSuite - with Matchers - with BeforeAndAfter - with LocalSparkContext { +trait RandomBlockReplicationPolicyBehavior extends SparkFunSuite + with Matchers + with BeforeAndAfter + with LocalSparkContext { // Implicitly convert strings to BlockIds for test clarity. - private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + val replicationPolicy: BlockReplicationPolicy + + val blockId = "test-block" /** * Test if we get the required number of peers when using random sampling from - * RandomBlockReplicationPolicy + * BlockReplicationPolicy */ - test(s"block replication - random block replication policy") { + test("block replication - random block replication policy") { val numBlockManagers = 10 val storeSize = 1000 - val blockManagers = (1 to numBlockManagers).map { i => - BlockManagerId(s"store-$i", "localhost", 1000 + i, None) - } + val blockManagers = generateBlockManagerIds(numBlockManagers, Seq("/Rack-1")) val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None) - val replicationPolicy = new RandomBlockReplicationPolicy - val blockId = "test-block" - (1 to 10).foreach {numReplicas => + (1 to 10).foreach { numReplicas => logDebug(s"Num replicas : $numReplicas") val randomPeers = replicationPolicy.prioritize( candidateBlockManager, @@ -68,7 +68,122 @@ class BlockReplicationPolicySuite extends SparkFunSuite logDebug(s"Random peers : ${secondPass.mkString(", ")}") assert(secondPass.toSet.size === numReplicas) } + } + + protected def generateBlockManagerIds(count: Int, racks: Seq[String]): Seq[BlockManagerId] = { + (1 to count).map{i => + BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size)))) + } + } +} + +trait TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior { + test("All peers in the same rack") { + val racks = Seq("/default-rack") + val numBlockManager = 10 + (1 to 10).foreach {numReplicas => + val peers = generateBlockManagerIds(numBlockManager, racks) + val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head)) + + val prioritizedPeers = replicationPolicy.prioritize( + blockManager, + peers, + mutable.HashSet.empty, + blockId, + numReplicas + ) + + assert(prioritizedPeers.toSet.size == numReplicas) + assert(prioritizedPeers.forall(p => p.host != blockManager.host)) + } + } + + test("Peers in 2 racks") { + val racks = Seq("/Rack-1", "/Rack-2") + (1 to 10).foreach {numReplicas => + val peers = generateBlockManagerIds(10, racks) + val blockManager = BlockManagerId("Driver", "Host-driver", 9001, Some(racks.head)) + + val prioritizedPeers = replicationPolicy.prioritize( + blockManager, + peers, + mutable.HashSet.empty, + blockId, + numReplicas + ) + + assert(prioritizedPeers.toSet.size == numReplicas) + val priorityPeers = prioritizedPeers.take(2) + assert(priorityPeers.forall(p => p.host != blockManager.host)) + if(numReplicas > 1) { + // both these conditions should be satisfied when numReplicas > 1 + assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo)) + assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo)) + } + } + } +} + +class RandomBlockReplicationPolicySuite extends RandomBlockReplicationPolicyBehavior { + override val replicationPolicy = new RandomBlockReplicationPolicy +} + +class BasicBlockReplicationPolicySuite extends TopologyAwareBlockReplicationPolicyBehavior { + override val replicationPolicy = new BasicBlockReplicationPolicy +} + +class ObjectivesBasedReplicationPolicySuite extends TopologyAwareBlockReplicationPolicyBehavior { + override val replicationPolicy = new ObjectivesBasedReplicationPolicy + + val objectives: Set[BlockReplicationObjective] = Set( + ReplicateToADifferentHost, + ReplicateBlockOutsideRack, + ReplicateBlockWithinRack, + NoTwoReplicasInSameRack + ) + + test("peers are all in the same rack") { + val blockManagerIds = generateBlockManagerIds(10, List("Default-rack")) + + val blockId = BlockId("test_block") + + val candidateBMId = generateBlockManagerIds(1, List("Default-rack")).head + + val numReplicas = 2 + + val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( + objectives, + blockManagerIds, + mutable.HashSet.empty, + blockId, + candidateBMId, + numReplicas) + + logDebug(s"Optimal peers : ${optimalPeers}") + logDebug(s"Objectives met : ${objectivesMet}") + assert(optimalPeers.size == 1) + assert(objectivesMet.size == 3) + } + + test("peers in 3 racks") { + val racks = List("/Rack1", "/Rack2", "/Rack3") + val blockManagerIds = generateBlockManagerIds(10, racks) + val candidateBMId = generateBlockManagerIds(1, racks).head + val blockId = BlockId("test_block") + val numReplicas = 2 + val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( + objectives, + blockManagerIds, + mutable.HashSet.empty, + blockId, + candidateBMId, + numReplicas) + logDebug(s"Optimal peers : ${optimalPeers}") + logDebug(s"Objectives met : ${objectivesMet}") + assert(optimalPeers.size == 2) + assert(objectivesMet.size == 4) + assert(objectives.forall(_.isObjectiveMet(candidateBMId, optimalPeers))) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala deleted file mode 100644 index 4aad5bd4c301..000000000000 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationStrategySuite.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import scala.util.Random - -import org.scalatest.Matchers - -import org.apache.spark.SparkFunSuite - -class BlockReplicationStrategySuite extends SparkFunSuite with Matchers { - val prioritizer = new BasicBlockReplicationPrioritization - val blockId = BlockId("test_block") - - test("All peers in the same rack") { - val racks = Seq("/default-rack") - - val peers = generateBlockManagerIds(10, racks) - val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head)) - - val prioritizedPeers = prioritizer.prioritize( - blockManager, - peers.toSeq, - Set.empty, - blockId - ) - - assert(prioritizedPeers.size == peers.size) - val priorityPeer = prioritizedPeers.head - assert(priorityPeer.host != blockManager.host) - } - - test("Peers in 2 racks") { - val racks = Seq("/Rack-1", "/Rack-2") - - val peers = generateBlockManagerIds(10, racks) - val blockManager = BlockManagerId("Exec-1", "Host-1", 9001, Some(racks.head)) - - val prioritizedPeers = prioritizer.prioritize( - blockManager, - peers.toSeq, - Set.empty, - blockId - ) - - assert(prioritizedPeers.size == peers.size) - val priorityPeers = prioritizedPeers.take(2) - assert(priorityPeers.forall(p => p.host != blockManager.host)) - assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo)) - assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo)) - } - - - private def generateBlockManagerIds(count: Int, racks: Seq[String]): Set[BlockManagerId] = { - (1 to count).map{i => - BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size)))) - }.toSet - } - -} From f638f025a3239a53e422a3b31e707352551b4c8b Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Tue, 1 Nov 2016 17:04:29 -0400 Subject: [PATCH 27/31] Fixing the test case to use the right conf parameter to set the replication policy. --- .../org/apache/spark/storage/BlockManagerReplicationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index d87730012964..a489ca08af22 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -558,7 +558,7 @@ class BlockManagerPrioritizerReplicationSuite extends BlockManagerReplicationBeh override val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") conf.set("spark.kryoserializer.buffer", "1m") conf.set( - "spark.replication.topologyawareness.prioritizer", + "spark.storage.replication.policy", classOf[ObjectivesBasedReplicationPolicy].getName) conf.set( "spark.storage.replication.topologyMapper", From bdf69d6fb38e7aa68e1d33bb25abc83c5d4ed4b1 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Wed, 2 Nov 2016 09:56:51 -0400 Subject: [PATCH 28/31] Fixing style errors. --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 1 - .../org/apache/spark/storage/BlockManagerReplicationSuite.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5f9ea1055375..cd885c866e7c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1281,7 +1281,6 @@ private[spark] class BlockManager( numPeersToReplicateTo - peersReplicatedTo.size) } } - logDebug(s"Replicating $blockId of ${data.size} bytes to " + s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index a489ca08af22..e36144be1c57 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ import org.apache.spark.util.Utils - trait BlockManagerReplicationBehavior extends SparkFunSuite with Matchers with BeforeAndAfter From d17f6e9f0103e0bf01ebfa8b961e6758ea27b693 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Fri, 24 Feb 2017 16:42:39 +0000 Subject: [PATCH 29/31] Incorporating suggestions from @sameeragarwal. Removing objectives based replication strategy and constraint solver associate with it. --- .../storage/BlockReplicationObjectives.scala | 202 ------------------ .../storage/BlockReplicationPolicy.scala | 53 ----- .../BlockManagerReplicationSuite.scala | 11 - .../storage/BlockReplicationPolicySuite.scala | 72 +------ 4 files changed, 5 insertions(+), 333 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala deleted file mode 100644 index 0909e393d68b..000000000000 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import scala.annotation.tailrec -import scala.collection.mutable -import scala.util.Random - -import org.apache.spark.internal.Logging - - -trait BlockReplicationObjective { - val weight = 1 - def isObjectiveMet(blockManagerId: BlockManagerId, peers: Seq[BlockManagerId]): Boolean - -} - -case object ReplicateToADifferentHost - extends BlockReplicationObjective { - override def isObjectiveMet( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId]): Boolean = { - peers.exists(_.host != blockManagerId.host) - } -} - -case object ReplicateBlockWithinRack - extends BlockReplicationObjective { - - override def isObjectiveMet( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId]): Boolean = { - peers.exists(_.topologyInfo == blockManagerId.topologyInfo) - } -} - -case object ReplicateBlockOutsideRack - extends BlockReplicationObjective { - override def isObjectiveMet( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId]): Boolean = { - peers.exists(_.topologyInfo != blockManagerId.topologyInfo) - } -} - -case object RandomlyReplicateBlock - extends BlockReplicationObjective { - override def isObjectiveMet( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId]): Boolean = { - peers.nonEmpty - } -} - -case object NoTwoReplicasInSameRack - extends BlockReplicationObjective { - override def isObjectiveMet( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId]): Boolean = { - val racksReplicatedTo = peers.map(_.topologyInfo).toSet.size - (peers.size == racksReplicatedTo) - } -} - -object BlockReplicationOptimizer extends Logging { - - /** - * Get a minimal set of peers that meet the objectives. This is a "best-effort" implementation. - * - * @param objectives set of block replication objectives - * @param peers set of candidate peers - * @param peersReplicatedTo set of peers we have already replicated to. Empty set if no - * replicas so far - * @param blockId block Id of the block being replicated, as a source of randomness - * @param blockManagerId current blockManagerId, so we know where we are - * @param numReplicas Number of peers we need to replicate to - * @return a tuple of set of optimal peers, and the objectives satisfied by the peers. - * Since this is a best-effort implemenation, all objectives might have been met. - */ - def getPeersToMeetObjectives( - objectives: Set[BlockReplicationObjective], - peers: Seq[BlockManagerId], - peersReplicatedTo: mutable.HashSet[BlockManagerId], - blockId: BlockId, - blockManagerId: BlockManagerId, - numReplicas: Int): (Seq[BlockManagerId], Set[BlockReplicationObjective]) = { - - val random = new Random(blockId.hashCode) - getOptimalPeers( - peers.toSet, - objectives, - Set.empty, - peersReplicatedTo, - Seq.empty, - random, - blockManagerId, - numReplicas) - } - - /** - * Greedy solution for set-cover like formulation. - * 1. We see how many objectives each peer satisfies - * 2. We choose a peer whose addition to optimal peers set satisfies the most objectives - * while making sure any previously satisfied objectives are still satisfied. - * 3. Once chosen, we remove this peer from the set of candidates - * 4. Repeat till we either run out of peers, or existing peers don't satify any more new - * objectives or we have met our numReplicas target - * @param peers - * @param objectivesLeft - * @param objectivesMet - * @param optimalPeers - * @param random - * @param blockManagerId - * @return - */ - @tailrec - private def getOptimalPeers( - peers: Set[BlockManagerId], - objectivesLeft: Set[BlockReplicationObjective], - objectivesMet: Set[BlockReplicationObjective], - peersReplicatedTo: mutable.HashSet[BlockManagerId], - optimalPeers: Seq[BlockManagerId], - random: Random, - blockManagerId: BlockManagerId, - numReplicas: Int): (Seq[BlockManagerId], Set[BlockReplicationObjective]) = { - - logDebug(s"Objectives left : ${objectivesLeft.mkString(", ")}") - logDebug(s"Objectives met : ${objectivesMet.mkString(", ")}") - - if (peers.isEmpty || optimalPeers.size == numReplicas) { - // we are done - (optimalPeers, objectivesMet) - } else { - // we see how the addition of this peer to optimalPeers changes objectives left/met - // ideally, we want a peer whose addition, meets more objectives - // while making sure we still meet objectives met so far - - val (maxCount, maxPeers) = peers.foldLeft((0, Seq.empty[BlockManagerId])) { - case ((prevMax, maxSeq), peer) => - val peersSet = peer +: (optimalPeers ++ peersReplicatedTo) - val allPreviousObjectivesMet = - objectivesMet.forall(_.isObjectiveMet(blockManagerId, peersSet)) - val score = if (allPreviousObjectivesMet) { - objectivesLeft.map{o => - if (o.isObjectiveMet(blockManagerId, peersSet)) o.weight else 0 - }.sum - } else { - 0 - } - if (score > prevMax) { - // we found a peer that gets us a higher score! - (score, Seq(peer)) - } else if (score == prevMax) { - // this peer matches our highest score so far, add this and continue - (prevMax, peer +: maxSeq) - } else { - // this peer scores lower, we ignore it - (prevMax, maxSeq) - } - } - - logDebug(s"Peers ${maxPeers.mkString(", ")} meet $maxCount objective/s") - - if(maxCount > 0) { - val maxPeer = maxPeers(random.nextInt(maxPeers.size)) - val newOptimalPeers = optimalPeers :+ maxPeer - val newObjectivesMet = - objectivesLeft.filter(_.isObjectiveMet(blockManagerId, newOptimalPeers)) - val newObjectivesLeft = objectivesLeft diff newObjectivesMet - getOptimalPeers( - peers - maxPeer, - newObjectivesLeft, - objectivesMet ++ newObjectivesMet, - peersReplicatedTo, - newOptimalPeers, - random, - blockManagerId, - numReplicas) - } else { - // we are done here since either no more objectives left, or - // no more peers left that satisfy any objectives - (optimalPeers, objectivesMet) - } - - } - } -} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala index c5ffdb793b0c..f2b8363470f2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala @@ -222,56 +222,3 @@ class BasicBlockReplicationPolicy } } - -@DeveloperApi -class ObjectivesBasedReplicationPolicy - extends BlockReplicationPolicy - with Logging { - val objectives: Set[BlockReplicationObjective] = Set( - ReplicateToADifferentHost, - ReplicateBlockOutsideRack, - ReplicateBlockWithinRack, - NoTwoReplicasInSameRack - ) - - /** - * Method to prioritize a bunch of candidate peers of a block - * - * @param blockManagerId Id of the current BlockManager for self identification - * @param peers A list of peers of a BlockManager - * @param peersReplicatedTo Set of peers already replicated to - * @param blockId BlockId of the block being replicated. This can be used as a source of - * randomness if needed. - * @param numReplicas Number of peers we need to replicate to - * @return A prioritized list of peers. Lower the index of a peer, higher its priority - */ - override def prioritize( - blockManagerId: BlockManagerId, - peers: Seq[BlockManagerId], - peersReplicatedTo: mutable.HashSet[BlockManagerId], - blockId: BlockId, - numReplicas: Int): List[BlockManagerId] = { - val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( - objectives, - peers, - peersReplicatedTo, - blockId, - blockManagerId, - numReplicas - ) - logDebug(s"BlockReplication objectives met : ${objectivesMet.mkString(", ")}") - logDebug(s"Optimal peers : ${optimalPeers.mkString(", ")}") - // pad optimal peers with random peers if we are short - val numRemainingPeers = numReplicas - optimalPeers.size - val remainingPeers = if (numRemainingPeers > 0) { - val r = new Random(blockId.hashCode) - BlockReplicationUtils.getRandomSample( - peers.filter(p => !optimalPeers.contains(p)), - numRemainingPeers, - r) - } else { - List.empty - } - optimalPeers.toList ++ remainingPeers - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index e36144be1c57..13020acdd3db 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -553,14 +553,3 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB classOf[DummyTopologyMapper].getName) } -class BlockManagerPrioritizerReplicationSuite extends BlockManagerReplicationBehavior { - override val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") - conf.set("spark.kryoserializer.buffer", "1m") - conf.set( - "spark.storage.replication.policy", - classOf[ObjectivesBasedReplicationPolicy].getName) - conf.set( - "spark.storage.replication.topologyMapper", - classOf[DummyTopologyMapper].getName - ) -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala index a4cd8c7afd0b..1e7c41b06f43 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala @@ -24,7 +24,7 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.{LocalSparkContext, SparkFunSuite} -trait RandomBlockReplicationPolicyBehavior extends SparkFunSuite +class RandomBlockReplicationPolicyBehavior extends SparkFunSuite with Matchers with BeforeAndAfter with LocalSparkContext { @@ -32,7 +32,7 @@ trait RandomBlockReplicationPolicyBehavior extends SparkFunSuite // Implicitly convert strings to BlockIds for test clarity. protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) - val replicationPolicy: BlockReplicationPolicy + val replicationPolicy: BlockReplicationPolicy = new RandomBlockReplicationPolicy val blockId = "test-block" /** @@ -77,7 +77,9 @@ trait RandomBlockReplicationPolicyBehavior extends SparkFunSuite } } -trait TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior { +class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior { + override val replicationPolicy = new BasicBlockReplicationPolicy + test("All peers in the same rack") { val racks = Seq("/default-rack") val numBlockManager = 10 @@ -123,67 +125,3 @@ trait TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplication } } } - -class RandomBlockReplicationPolicySuite extends RandomBlockReplicationPolicyBehavior { - override val replicationPolicy = new RandomBlockReplicationPolicy -} - -class BasicBlockReplicationPolicySuite extends TopologyAwareBlockReplicationPolicyBehavior { - override val replicationPolicy = new BasicBlockReplicationPolicy -} - -class ObjectivesBasedReplicationPolicySuite extends TopologyAwareBlockReplicationPolicyBehavior { - override val replicationPolicy = new ObjectivesBasedReplicationPolicy - - val objectives: Set[BlockReplicationObjective] = Set( - ReplicateToADifferentHost, - ReplicateBlockOutsideRack, - ReplicateBlockWithinRack, - NoTwoReplicasInSameRack - ) - - test("peers are all in the same rack") { - val blockManagerIds = generateBlockManagerIds(10, List("Default-rack")) - - val blockId = BlockId("test_block") - - val candidateBMId = generateBlockManagerIds(1, List("Default-rack")).head - - val numReplicas = 2 - - val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( - objectives, - blockManagerIds, - mutable.HashSet.empty, - blockId, - candidateBMId, - numReplicas) - - logDebug(s"Optimal peers : ${optimalPeers}") - logDebug(s"Objectives met : ${objectivesMet}") - assert(optimalPeers.size == 1) - assert(objectivesMet.size == 3) - } - - test("peers in 3 racks") { - val racks = List("/Rack1", "/Rack2", "/Rack3") - val blockManagerIds = generateBlockManagerIds(10, racks) - val candidateBMId = generateBlockManagerIds(1, racks).head - val blockId = BlockId("test_block") - val numReplicas = 2 - val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives( - objectives, - blockManagerIds, - mutable.HashSet.empty, - blockId, - candidateBMId, - numReplicas) - - logDebug(s"Optimal peers : ${optimalPeers}") - logDebug(s"Objectives met : ${objectivesMet}") - assert(optimalPeers.size == 2) - assert(objectivesMet.size == 4) - assert(objectives.forall(_.isObjectiveMet(candidateBMId, optimalPeers))) - } - -} From 794a7209b2e97a8b723ca764389914fe26e2b3db Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Thu, 23 Mar 2017 12:13:53 -0400 Subject: [PATCH 30/31] Incorporating suggestions from @cloud-fan --- .../spark/storage/BlockReplicationPolicy.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala index f2b8363470f2..353eac60df17 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala @@ -67,13 +67,11 @@ object BlockReplicationUtils { */ // scalastyle:on line.size.limit private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = { - val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) => + val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) {case (set, i) => val t = r.nextInt(i) + 1 if (set.contains(t)) set + i else set + t } - // we shuffle the result to ensure a random arrangement within the sample - // to avoid any bias from set implementations - r.shuffle(indices.map(_ - 1).toList) + indices.map(_ - 1).toList } /** @@ -140,8 +138,10 @@ class BasicBlockReplicationPolicy /** * Method to prioritize a bunch of candidate peers of a block manager. This implementation - * replicates the behavior of block replication in HDFS, a peer is chosen within the rack, - * one outside and that's it. This works best with a total replication factor of 3. + * replicates the behavior of block replication in HDFS. For a given number of replicas needed, + * we choose a peer within the rack, one outside and remaining blockmanagers are chosen at + * random, in that order till we meet the number of replicas needed. + * This works best with a total replication factor of 3, like HDFS. * * @param blockManagerId Id of the current BlockManager for self identification * @param peers A list of peers of a BlockManager @@ -163,7 +163,7 @@ class BasicBlockReplicationPolicy val random = new Random(blockId.hashCode) - // if block doesn't have topology info, we can't do much, so we randlomly shuffle + // if block doesn't have topology info, we can't do much, so we randomly shuffle // if there is, we see what's needed from peersReplicatedTo and based on numReplicas, // we choose whats needed if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) { From c465aaf044c73228bbd60aa4b717def734f338c8 Mon Sep 17 00:00:00 2001 From: Shubham Chopra Date: Mon, 27 Mar 2017 11:46:54 -0400 Subject: [PATCH 31/31] Correcting indentation. --- .../apache/spark/storage/BlockReplicationPolicySuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala index 1e7c41b06f43..ecad0f5352e5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala @@ -25,9 +25,9 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.{LocalSparkContext, SparkFunSuite} class RandomBlockReplicationPolicyBehavior extends SparkFunSuite - with Matchers - with BeforeAndAfter - with LocalSparkContext { + with Matchers + with BeforeAndAfter + with LocalSparkContext { // Implicitly convert strings to BlockIds for test clarity. protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)