From 926ea2551487f170b24f06a0d6c11879103b05b5 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 15 Mar 2017 10:42:49 +0800 Subject: [PATCH 1/3] optimize a location order of blocks with topology information --- .../apache/spark/storage/BlockManager.scala | 7 +++-- .../spark/storage/BlockManagerSuite.scala | 26 ++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 45b73380806d..c7acb553c639 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -555,12 +555,15 @@ private[spark] class BlockManager( /** * Return a list of locations for the given block, prioritizing the local machine since - * multiple block managers can share the same host. + * multiple block managers can share the same host, then try to get the same rack data. */ private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { val locs = Random.shuffle(master.getLocations(blockId)) val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } - preferredLocs ++ otherLocs + val (sameRackLocs, differentRackLocs) = otherLocs.partition { + loc => blockManagerId.topologyInfo == loc.topologyInfo + } + preferredLocs ++ sameRackLocs ++ differentRackLocs } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 64a67b4c4cba..7afb45ae61cb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -485,7 +485,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } - test("optimize a location order of blocks") { + test("optimize a location order of blocks without topology information") { val localHost = Utils.localHostName() val otherHost = "otherHost" val bmMaster = mock(classOf[BlockManagerMaster]) @@ -500,6 +500,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost)) } + test("optimize a location order of blocks with topology information") { + val localHost = Utils.localHostName() + val otherHost = "otherHost" + val localRack = "localRack" + val otherRack = "otherRack" + + val bmMaster = mock(classOf[BlockManagerMaster]) + val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack)) + val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack)) + val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack)) + val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack)) + val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack)) + when(bmMaster.getLocations(mc.any[BlockId])) + .thenReturn(Seq(bmId1, bmId2, bmId3, bmId3, bmId4, bmId5)) + + val blockManager = makeBlockManager(128, "exec", bmMaster) + val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) + val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) + assert(locations.map(_.host).toSet + === Set(localHost, localHost, otherHost, otherHost, otherHost)) + assert(locations.flatMap(_.topologyInfo).toSet + === Set(localRack, localRack, localRack, otherRack, otherRack)) + } + test("SPARK-9591: getRemoteBytes from another location when Exception throw") { conf.set("spark.shuffle.io.maxRetries", "0") store = makeBlockManager(8000, "executor1") From 56f5231626cceb114c45413b7b340ee719c3f2f8 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 22 Mar 2017 20:29:48 +0800 Subject: [PATCH 2/3] Address comments --- .../scala/org/apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/storage/BlockManagerSuite.scala | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c7acb553c639..4b965626a727 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -555,7 +555,7 @@ private[spark] class BlockManager( /** * Return a list of locations for the given block, prioritizing the local machine since - * multiple block managers can share the same host, then try to get the same rack data. + * multiple block managers can share the same host, followed by hosts on the same rack. */ private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { val locs = Random.shuffle(master.getLocations(blockId)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 7afb45ae61cb..0d3e3cdbdb9f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -497,7 +497,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockManager = makeBlockManager(128, "exec", bmMaster) val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) - assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost)) + assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) } test("optimize a location order of blocks with topology information") { @@ -513,15 +513,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack)) val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack)) when(bmMaster.getLocations(mc.any[BlockId])) - .thenReturn(Seq(bmId1, bmId2, bmId3, bmId3, bmId4, bmId5)) + .thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4)) val blockManager = makeBlockManager(128, "exec", bmMaster) val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) - assert(locations.map(_.host).toSet - === Set(localHost, localHost, otherHost, otherHost, otherHost)) - assert(locations.flatMap(_.topologyInfo).toSet - === Set(localRack, localRack, localRack, otherRack, otherRack)) + assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost)) + assert(locations.flatMap(_.topologyInfo) + === Seq(localRack, localRack, localRack, otherRack, otherRack)) } test("SPARK-9591: getRemoteBytes from another location when Exception throw") { From a3e979f5e421b7c5d07811eee001f5b086166093 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 5 May 2017 13:47:29 +0800 Subject: [PATCH 3/3] address comments, and fix the test error --- .../scala/org/apache/spark/storage/BlockManager.scala | 10 +++++++--- .../org/apache/spark/storage/BlockManagerSuite.scala | 6 ++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5bdc37a8618b..33ce30c58e1a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -617,10 +617,14 @@ private[spark] class BlockManager( private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { val locs = Random.shuffle(master.getLocations(blockId)) val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } - val (sameRackLocs, differentRackLocs) = otherLocs.partition { - loc => blockManagerId.topologyInfo == loc.topologyInfo + blockManagerId.topologyInfo match { + case None => preferredLocs ++ otherLocs + case Some(_) => + val (sameRackLocs, differentRackLocs) = otherLocs.partition { + loc => blockManagerId.topologyInfo == loc.topologyInfo + } + preferredLocs ++ sameRackLocs ++ differentRackLocs } - preferredLocs ++ sameRackLocs ++ differentRackLocs } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d90902d92ab3..1e7bcdb6740f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -497,7 +497,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("optimize a location order of blocks without topology information") { - val localHost = Utils.localHostName() + val localHost = "localhost" val otherHost = "otherHost" val bmMaster = mock(classOf[BlockManagerMaster]) val bmId1 = BlockManagerId("id1", localHost, 1) @@ -512,7 +512,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("optimize a location order of blocks with topology information") { - val localHost = Utils.localHostName() + val localHost = "localhost" val otherHost = "otherHost" val localRack = "localRack" val otherRack = "otherRack" @@ -527,6 +527,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4)) val blockManager = makeBlockManager(128, "exec", bmMaster) + blockManager.blockManagerId = + BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack)) val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))