Skip to content

Commit 20fcf3d

Browse files
committed
[SPARK-2977] Ensure ShuffleManager is created before ShuffleBlockManager
This is intended to fix SPARK-2977. Before, there was an implicit ordering dependency where we needed to know the ShuffleManager implementation before creating the ShuffleBlockManager. This patch makes that dependency explicit by adding ShuffleManager to a bunch of constructors. I think it's a little odd for BlockManager to take a ShuffleManager only to pass it to ShuffleBlockManager without using it itself; there's an opportunity to clean this up later if we sever the circular dependencies between BlockManager and other components and pass those components to BlockManager's constructor. Author: Josh Rosen <[email protected]> Closes #1976 from JoshRosen/SPARK-2977 and squashes the following commits: a9cd1e1 [Josh Rosen] [SPARK-2977] Ensure ShuffleManager is created before ShuffleBlockManager.
1 parent a83c772 commit 20fcf3d

File tree

6 files changed

+37
-26
lines changed

6 files changed

+37
-26
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
210210
"MapOutputTracker",
211211
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
212212

213+
// Let the user specify short names for shuffle managers
214+
val shortShuffleMgrNames = Map(
215+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
216+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
217+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
218+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
219+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
220+
221+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
222+
213223
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
214224
"BlockManagerMaster",
215225
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
216226

217227
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
218-
serializer, conf, securityManager, mapOutputTracker)
228+
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
219229

220230
val connectionManager = blockManager.connectionManager
221231

@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
250260
"."
251261
}
252262

253-
// Let the user specify short names for shuffle managers
254-
val shortShuffleMgrNames = Map(
255-
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256-
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258-
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
260-
261-
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
262-
263263
// Warn about deprecated spark.cache.class property
264264
if (conf.contains("spark.cache.class")) {
265265
logWarning("The spark.cache.class property is no longer being used! Specify storage " +

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.executor._
3333
import org.apache.spark.io.CompressionCodec
3434
import org.apache.spark.network._
3535
import org.apache.spark.serializer.Serializer
36+
import org.apache.spark.shuffle.ShuffleManager
3637
import org.apache.spark.util._
3738

3839
private[spark] sealed trait BlockValues
@@ -57,11 +58,12 @@ private[spark] class BlockManager(
5758
maxMemory: Long,
5859
val conf: SparkConf,
5960
securityManager: SecurityManager,
60-
mapOutputTracker: MapOutputTracker)
61+
mapOutputTracker: MapOutputTracker,
62+
shuffleManager: ShuffleManager)
6163
extends Logging {
6264

6365
private val port = conf.getInt("spark.blockManager.port", 0)
64-
val shuffleBlockManager = new ShuffleBlockManager(this)
66+
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
6567
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
6668
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
6769
val connectionManager =
@@ -142,9 +144,10 @@ private[spark] class BlockManager(
142144
serializer: Serializer,
143145
conf: SparkConf,
144146
securityManager: SecurityManager,
145-
mapOutputTracker: MapOutputTracker) = {
147+
mapOutputTracker: MapOutputTracker,
148+
shuffleManager: ShuffleManager) = {
146149
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
147-
conf, securityManager, mapOutputTracker)
150+
conf, securityManager, mapOutputTracker, shuffleManager)
148151
}
149152

150153
/**

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.JavaConversions._
2525

2626
import org.apache.spark.Logging
2727
import org.apache.spark.serializer.Serializer
28+
import org.apache.spark.shuffle.ShuffleManager
2829
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
2930
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
3031
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -62,7 +63,8 @@ private[spark] trait ShuffleWriterGroup {
6263
*/
6364
// TODO: Factor this into a separate class for each ShuffleManager implementation
6465
private[spark]
65-
class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
66+
class ShuffleBlockManager(blockManager: BlockManager,
67+
shuffleManager: ShuffleManager) extends Logging {
6668
def conf = blockManager.conf
6769

6870
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -71,8 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
7173
conf.getBoolean("spark.shuffle.consolidateFiles", false)
7274

7375
// Are we using sort-based shuffle?
74-
val sortBasedShuffle =
75-
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
76+
val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager]
7677

7778
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
7879

core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.storage
2020
import java.util.concurrent.ArrayBlockingQueue
2121

2222
import akka.actor._
23+
import org.apache.spark.shuffle.hash.HashShuffleManager
2324
import util.Random
2425

2526
import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
@@ -101,7 +102,7 @@ private[spark] object ThreadingTest {
101102
conf)
102103
val blockManager = new BlockManager(
103104
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
104-
new SecurityManager(conf), new MapOutputTrackerMaster(conf))
105+
new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf))
105106
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
106107
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
107108
producers.foreach(_.start)

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
2424
import akka.actor._
2525
import akka.pattern.ask
2626
import akka.util.Timeout
27+
import org.apache.spark.shuffle.hash.HashShuffleManager
2728

2829
import org.mockito.invocation.InvocationOnMock
2930
import org.mockito.Matchers.any
@@ -61,6 +62,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
6162
conf.set("spark.authenticate", "false")
6263
val securityMgr = new SecurityManager(conf)
6364
val mapOutputTracker = new MapOutputTrackerMaster(conf)
65+
val shuffleManager = new HashShuffleManager(conf)
6466

6567
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
6668
conf.set("spark.kryoserializer.buffer.mb", "1")
@@ -71,8 +73,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
7173
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
7274

7375
private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
74-
new BlockManager(
75-
name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker)
76+
new BlockManager(name, actorSystem, master, serializer, maxMem, conf, securityMgr,
77+
mapOutputTracker, shuffleManager)
7678
}
7779

7880
before {
@@ -791,7 +793,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
791793
test("block store put failure") {
792794
// Use Java serializer so we can create an unserializable error.
793795
store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf,
794-
securityMgr, mapOutputTracker)
796+
securityMgr, mapOutputTracker, shuffleManager)
795797

796798
// The put should fail since a1 is not serializable.
797799
class UnserializableClass
@@ -1007,7 +1009,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
10071009

10081010
test("return error message when error occurred in BlockManagerWorker#onBlockMessageReceive") {
10091011
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
1010-
securityMgr, mapOutputTracker)
1012+
securityMgr, mapOutputTracker, shuffleManager)
10111013

10121014
val worker = spy(new BlockManagerWorker(store))
10131015
val connManagerId = mock(classOf[ConnectionManagerId])
@@ -1054,7 +1056,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
10541056

10551057
test("return ack message when no error occurred in BlocManagerWorker#onBlockMessageReceive") {
10561058
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
1057-
securityMgr, mapOutputTracker)
1059+
securityMgr, mapOutputTracker, shuffleManager)
10581060

10591061
val worker = spy(new BlockManagerWorker(store))
10601062
val connManagerId = mock(classOf[ConnectionManagerId])

core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.storage
1919

2020
import java.io.{File, FileWriter}
2121

22+
import org.apache.spark.shuffle.hash.HashShuffleManager
23+
2224
import scala.collection.mutable
2325
import scala.language.reflectiveCalls
2426

@@ -42,7 +44,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
4244
// so we coerce consolidation if not already enabled.
4345
testConf.set("spark.shuffle.consolidateFiles", "true")
4446

45-
val shuffleBlockManager = new ShuffleBlockManager(null) {
47+
private val shuffleManager = new HashShuffleManager(testConf.clone)
48+
49+
val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) {
4650
override def conf = testConf.clone
4751
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
4852
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
@@ -148,7 +152,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
148152
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
149153
confCopy)
150154
val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
151-
securityManager, null)
155+
securityManager, null, shuffleManager)
152156

153157
try {
154158

0 commit comments

Comments
 (0)