Skip to content

Commit c542297

Browse files
Minchu Yangrmcyang
authored andcommitted
[SPARK-36705][FOLLOW-UP] Support the case when user's classes need to register for Kryo serialization
### What changes were proposed in this pull request? - Make the val lazy wherever `isPushBasedShuffleEnabled` is invoked when it is a class instance variable, so it can happen after user-defined jars/classes in `spark.kryo.classesToRegister` are downloaded and available on executor-side, as part of the fix for the exception mentioned below. - Add a flag `checkSerializer` to control whether we need to check a serializer is `supportsRelocationOfSerializedObjects` or not within `isPushBasedShuffleEnabled` as part of the fix for the exception mentioned below. Specifically, we don't check this in `registerWithExternalShuffleServer()` in `BlockManager` and `createLocalDirsForMergedShuffleBlocks()` in `DiskBlockManager.scala` as the same issue would raise otherwise. - Move `instantiateClassFromConf` and `instantiateClass` from `SparkEnv` into `Utils`, in order to let `isPushBasedShuffleEnabled` to leverage them for instantiating serializer instances. ### Why are the changes needed? When user tries to set classes for Kryo Serialization by `spark.kryo.classesToRegister`, below exception(or similar) would be encountered in `isPushBasedShuffleEnabled` as indicated below. Reproduced the issue in our internal branch by launching spark-shell as: ``` spark-shell --spark-version 3.1.1 --packages ml.dmlc:xgboost4j_2.12:1.3.1 --conf spark.kryo.classesToRegister=ml.dmlc.xgboost4j.scala.Booster ``` ``` Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393) at org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:83) at org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:183) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:230) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:171) at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:102) at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:109) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:346) at org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:446) at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:253) at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:249) at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2584) at org.apache.spark.MapOutputTrackerWorker.<init>(MapOutputTracker.scala:1109) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:322) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:205) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:442) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) ... 4 more Caused by: java.lang.ClassNotFoundException: ml.dmlc.xgboost4j.scala.Booster at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:217) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:174) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:173) ... 24 more ``` Registering user class for kryo serialization is happening after serializer creation in SparkEnv. Serializer creation can happen in `isPushBasedShuffleEnabled`, which can be called in some places prior to SparkEnv is created. Also, as per analysis by JoshRosen, this is probably due to Kryo instantiation was failing because added packages hadn't been downloaded to the executor yet (because this code is running during executor startup, not task startup). The proposed change helps fix this issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passed existing tests. Tested this patch in our internal branch where user reported the issue. Issue is now not reproducible with this patch. Closes #34158 from rmcyang/SPARK-33781-bugFix. Lead-authored-by: Minchu Yang <[email protected]> Co-authored-by: Minchu Yang <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit e5b01cd) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent 8ffe00e commit c542297

20 files changed

+117
-74
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
163163
}
164164

165165
private def canShuffleMergeBeEnabled(): Boolean = {
166-
val isPushShuffleEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf)
166+
val isPushShuffleEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf,
167+
// invoked at driver
168+
isDriver = true)
167169
if (isPushShuffleEnabled && rdd.isBarrier()) {
168170
logWarning("Push-based shuffle is currently not supported for barrier stages")
169171
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ private[spark] class MapOutputTrackerMaster(
617617
private val mapOutputTrackerMasterMessages =
618618
new LinkedBlockingQueue[MapOutputTrackerMasterMessage]
619619

620-
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf)
620+
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver = true)
621621

622622
// Thread pool used for handling map output status requests. This is a separate thread pool
623623
// to ensure we don't block the normal dispatcher threads.
@@ -1126,7 +1126,11 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
11261126
val mergeStatuses: Map[Int, Array[MergeStatus]] =
11271127
new ConcurrentHashMap[Int, Array[MergeStatus]]().asScala
11281128

1129-
private val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf)
1129+
// This must be lazy to ensure that it is initialized when the first task is run and not at
1130+
// executor startup time. At startup time, user-added libraries may not have been
1131+
// downloaded to the executor, causing `isPushBasedShuffleEnabled` to fail when it tries to
1132+
// instantiate a serializer. See the followup to SPARK-36705 for more details.
1133+
private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
11301134

11311135
/**
11321136
* A [[KeyLock]] whose key is a shuffle id to ensure there is only one thread fetching

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -272,33 +272,7 @@ object SparkEnv extends Logging {
272272
conf.set(DRIVER_PORT, rpcEnv.address.port)
273273
}
274274

275-
// Create an instance of the class with the given name, possibly initializing it with our conf
276-
def instantiateClass[T](className: String): T = {
277-
val cls = Utils.classForName(className)
278-
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
279-
// SparkConf, then one taking no arguments
280-
try {
281-
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
282-
.newInstance(conf, java.lang.Boolean.valueOf(isDriver))
283-
.asInstanceOf[T]
284-
} catch {
285-
case _: NoSuchMethodException =>
286-
try {
287-
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
288-
} catch {
289-
case _: NoSuchMethodException =>
290-
cls.getConstructor().newInstance().asInstanceOf[T]
291-
}
292-
}
293-
}
294-
295-
// Create an instance of the class named by the given SparkConf property
296-
// if the property is not set, possibly initializing it with our conf
297-
def instantiateClassFromConf[T](propertyName: ConfigEntry[String]): T = {
298-
instantiateClass[T](conf.get(propertyName))
299-
}
300-
301-
val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
275+
val serializer = Utils.instantiateSerializerFromConf[Serializer](SERIALIZER, conf, isDriver)
302276
logDebug(s"Using serializer: ${serializer.getClass}")
303277

304278
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
@@ -337,7 +311,8 @@ object SparkEnv extends Logging {
337311
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
338312
val shuffleMgrClass =
339313
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
340-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
314+
val shuffleManager = Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
315+
shuffleMgrClass, conf, isDriver)
341316

342317
val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)
343318

@@ -370,7 +345,7 @@ object SparkEnv extends Logging {
370345
} else {
371346
None
372347
}, blockManagerInfo,
373-
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
348+
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)),
374349
registerOrLookupEndpoint(
375350
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
376351
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ private[spark] class DAGScheduler(
253253
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
254254
taskScheduler.setDAGScheduler(this)
255255

256-
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)
256+
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf, isDriver = true)
257257

258258
private val blockManagerMasterDriverHeartbeatTimeout =
259259
sc.getConf.get(config.STORAGE_BLOCKMANAGER_MASTER_DRIVER_HEARTBEAT_TIMEOUT).millis

core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService
2424

2525
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
2626

27-
import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv}
27+
import org.apache.spark.{ShuffleDependency, SparkConf, SparkContext, SparkEnv}
2828
import org.apache.spark.annotation.Since
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.internal.config._
@@ -463,7 +463,8 @@ private[spark] object ShuffleBlockPusher {
463463

464464
private val BLOCK_PUSHER_POOL: ExecutorService = {
465465
val conf = SparkEnv.get.conf
466-
if (Utils.isPushBasedShuffleEnabled(conf)) {
466+
if (Utils.isPushBasedShuffleEnabled(conf,
467+
isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId)) {
467468
val numThreads = conf.get(SHUFFLE_NUM_PUSH_THREADS)
468469
.getOrElse(conf.getInt(SparkLauncher.EXECUTOR_CORES, 1))
469470
ThreadUtils.newDaemonFixedThreadPool(numThreads, "shuffle-block-push-thread")

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ private[spark] class BlockManager(
184184

185185
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
186186
private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined
187+
private val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
187188

188189
private val remoteReadNioBufferConversion =
189190
conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)
@@ -193,8 +194,8 @@ private[spark] class BlockManager(
193194
val diskBlockManager = {
194195
// Only perform cleanup if an external service is not serving our shuffle files.
195196
val deleteFilesOnStop =
196-
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
197-
new DiskBlockManager(conf, deleteFilesOnStop)
197+
!externalShuffleServiceEnabled || isDriver
198+
new DiskBlockManager(conf, deleteFilesOnStop = deleteFilesOnStop, isDriver = isDriver)
198199
}
199200

200201
// Visible for testing
@@ -533,7 +534,7 @@ private[spark] class BlockManager(
533534
hostLocalDirManager = {
534535
if ((conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
535536
!conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) ||
536-
Utils.isPushBasedShuffleEnabled(conf)) {
537+
Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
537538
Some(new HostLocalDirManager(
538539
futureExecutionContext,
539540
conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE),
@@ -559,7 +560,7 @@ private[spark] class BlockManager(
559560
private def registerWithExternalShuffleServer(): Unit = {
560561
logInfo("Registering executor with local external shuffle service.")
561562
val shuffleManagerMeta =
562-
if (Utils.isPushBasedShuffleEnabled(conf)) {
563+
if (Utils.isPushBasedShuffleEnabled(conf, isDriver = isDriver, checkSerializer = false)) {
563564
s"${shuffleManager.getClass.getName}:" +
564565
s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}"
565566
} else {

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class BlockManagerMasterEndpoint(
5151
listenerBus: LiveListenerBus,
5252
externalBlockStoreClient: Option[ExternalBlockStoreClient],
5353
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
54-
mapOutputTracker: MapOutputTrackerMaster)
54+
mapOutputTracker: MapOutputTrackerMaster,
55+
isDriver: Boolean)
5556
extends IsolatedRpcEndpoint with Logging {
5657

5758
// Mapping from executor id to the block manager's local disk directories.
@@ -100,7 +101,7 @@ class BlockManagerMasterEndpoint(
100101

101102
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
102103

103-
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf)
104+
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver)
104105

105106
logInfo("BlockManagerMasterEndpoint up")
106107
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
4444
*
4545
* ShuffleDataIO also can change the behavior of deleteFilesOnStop.
4646
*/
47-
private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
47+
private[spark] class DiskBlockManager(
48+
conf: SparkConf,
49+
var deleteFilesOnStop: Boolean,
50+
isDriver: Boolean)
4851
extends Logging {
4952

5053
private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES)
@@ -207,7 +210,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
207210
* permission to create directories under application local directories.
208211
*/
209212
private def createLocalDirsForMergedShuffleBlocks(): Unit = {
210-
if (Utils.isPushBasedShuffleEnabled(conf)) {
213+
if (Utils.isPushBasedShuffleEnabled(conf, isDriver = isDriver, checkSerializer = false)) {
211214
// Will create the merge_manager directory only if it doesn't exist under the local dir.
212215
Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
213216
try {

core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private class PushBasedFetchHelper(
142142
val mergedBlocksMetaListener = new MergedBlocksMetaListener {
143143
override def onSuccess(shuffleId: Int, shuffleMergeId: Int, reduceId: Int,
144144
meta: MergedBlockMeta): Unit = {
145-
logInfo(s"Received the meta of push-merged block for ($shuffleId, $shuffleMergeId," +
145+
logDebug(s"Received the meta of push-merged block for ($shuffleId, $shuffleMergeId," +
146146
s" $reduceId) from ${req.address.host}:${req.address.port}")
147147
try {
148148
iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2597,18 +2597,31 @@ private[spark] object Utils extends Logging {
25972597
* - IO encryption disabled
25982598
* - serializer(such as KryoSerializer) supports relocation of serialized objects
25992599
*/
2600-
def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
2600+
def isPushBasedShuffleEnabled(conf: SparkConf,
2601+
isDriver: Boolean,
2602+
checkSerializer: Boolean = true): Boolean = {
26012603
val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
26022604
if (pushBasedShuffleEnabled) {
2603-
val serializer = Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf])
2604-
.newInstance(conf).asInstanceOf[Serializer]
2605-
val canDoPushBasedShuffle = conf.get(IS_TESTING).getOrElse(false) ||
2606-
(conf.get(SHUFFLE_SERVICE_ENABLED) &&
2607-
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" &&
2608-
// TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle
2609-
!conf.get(IO_ENCRYPTION_ENABLED) &&
2610-
serializer.supportsRelocationOfSerializedObjects)
2611-
2605+
val canDoPushBasedShuffle = {
2606+
val isTesting = conf.get(IS_TESTING).getOrElse(false)
2607+
val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
2608+
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
2609+
lazy val serializerIsSupported = {
2610+
if (checkSerializer) {
2611+
Option(SparkEnv.get)
2612+
.map(_.serializer)
2613+
.filter(_ != null)
2614+
.getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf, isDriver))
2615+
.supportsRelocationOfSerializedObjects
2616+
} else {
2617+
// if no need to check Serializer, always set serializerIsSupported as true
2618+
true
2619+
}
2620+
}
2621+
// TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle
2622+
val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
2623+
(isShuffleServiceAndYarn || isTesting) && ioEncryptionDisabled && serializerIsSupported
2624+
}
26122625
if (!canDoPushBasedShuffle) {
26132626
logWarning("Push-based shuffle can only be enabled when the application is submitted " +
26142627
"to run in YARN mode, with external shuffle service enabled, IO encryption disabled, " +
@@ -2621,6 +2634,38 @@ private[spark] object Utils extends Logging {
26212634
}
26222635
}
26232636

2637+
// Create an instance of Serializer or ShuffleManager with the given name,
2638+
// possibly initializing it with our conf
2639+
def instantiateSerializerOrShuffleManager[T](className: String,
2640+
conf: SparkConf,
2641+
isDriver: Boolean): T = {
2642+
val cls = Utils.classForName(className)
2643+
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
2644+
// SparkConf, then one taking no arguments
2645+
try {
2646+
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
2647+
.newInstance(conf, java.lang.Boolean.valueOf(isDriver))
2648+
.asInstanceOf[T]
2649+
} catch {
2650+
case _: NoSuchMethodException =>
2651+
try {
2652+
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
2653+
} catch {
2654+
case _: NoSuchMethodException =>
2655+
cls.getConstructor().newInstance().asInstanceOf[T]
2656+
}
2657+
}
2658+
}
2659+
2660+
// Create an instance of Serializer named by the given SparkConf property
2661+
// if the property is not set, possibly initializing it with our conf
2662+
def instantiateSerializerFromConf[T](propertyName: ConfigEntry[String],
2663+
conf: SparkConf,
2664+
isDriver: Boolean): T = {
2665+
instantiateSerializerOrShuffleManager[T](
2666+
conf.get(propertyName), conf, isDriver)
2667+
}
2668+
26242669
/**
26252670
* Return whether dynamic allocation is enabled in the given conf.
26262671
*/

0 commit comments

Comments
 (0)