@@ -34,6 +34,7 @@ import org.apache.spark.metrics.MetricsSystem
3434import org .apache .spark .network .ConnectionManager
3535import org .apache .spark .scheduler .LiveListenerBus
3636import org .apache .spark .serializer .Serializer
37+ import org .apache .spark .shuffle .ShuffleManager
3738import org .apache .spark .storage ._
3839import org .apache .spark .util .{AkkaUtils , Utils }
3940
@@ -56,7 +57,7 @@ class SparkEnv (
5657 val closureSerializer : Serializer ,
5758 val cacheManager : CacheManager ,
5859 val mapOutputTracker : MapOutputTracker ,
59- val shuffleFetcher : ShuffleFetcher ,
60+ val shuffleManager : ShuffleManager ,
6061 val broadcastManager : BroadcastManager ,
6162 val blockManager : BlockManager ,
6263 val connectionManager : ConnectionManager ,
@@ -80,7 +81,7 @@ class SparkEnv (
8081 pythonWorkers.foreach { case (key, worker) => worker.stop() }
8182 httpFileServer.stop()
8283 mapOutputTracker.stop()
83- shuffleFetcher .stop()
84+ shuffleManager .stop()
8485 broadcastManager.stop()
8586 blockManager.stop()
8687 blockManager.master.stop()
@@ -163,13 +164,20 @@ object SparkEnv extends Logging {
163164 def instantiateClass [T ](propertyName : String , defaultClassName : String ): T = {
164165 val name = conf.get(propertyName, defaultClassName)
165166 val cls = Class .forName(name, true , Utils .getContextOrSparkClassLoader)
166- // First try with the constructor that takes SparkConf. If we can't find one,
167- // use a no-arg constructor instead.
167+ // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
168+ // SparkConf, then one taking no arguments
168169 try {
169- cls.getConstructor(classOf [SparkConf ]).newInstance(conf).asInstanceOf [T ]
170+ cls.getConstructor(classOf [SparkConf ], java.lang.Boolean .TYPE )
171+ .newInstance(conf, new java.lang.Boolean (isDriver))
172+ .asInstanceOf [T ]
170173 } catch {
171174 case _ : NoSuchMethodException =>
172- cls.getConstructor().newInstance().asInstanceOf [T ]
175+ try {
176+ cls.getConstructor(classOf [SparkConf ]).newInstance(conf).asInstanceOf [T ]
177+ } catch {
178+ case _ : NoSuchMethodException =>
179+ cls.getConstructor().newInstance().asInstanceOf [T ]
180+ }
173181 }
174182 }
175183
@@ -219,9 +227,6 @@ object SparkEnv extends Logging {
219227
220228 val cacheManager = new CacheManager (blockManager)
221229
222- val shuffleFetcher = instantiateClass[ShuffleFetcher ](
223- " spark.shuffle.fetcher" , " org.apache.spark.BlockStoreShuffleFetcher" )
224-
225230 val httpFileServer = new HttpFileServer (securityManager)
226231 httpFileServer.initialize()
227232 conf.set(" spark.fileserver.uri" , httpFileServer.serverUri)
@@ -242,6 +247,9 @@ object SparkEnv extends Logging {
242247 " ."
243248 }
244249
250+ val shuffleManager = instantiateClass[ShuffleManager ](
251+ " spark.shuffle.manager" , " org.apache.spark.shuffle.hash.HashShuffleManager" )
252+
245253 // Warn about deprecated spark.cache.class property
246254 if (conf.contains(" spark.cache.class" )) {
247255 logWarning(" The spark.cache.class property is no longer being used! Specify storage " +
@@ -255,7 +263,7 @@ object SparkEnv extends Logging {
255263 closureSerializer,
256264 cacheManager,
257265 mapOutputTracker,
258- shuffleFetcher ,
266+ shuffleManager ,
259267 broadcastManager,
260268 blockManager,
261269 connectionManager,
0 commit comments