Skip to content

Commit 571acc8

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-34939][CORE] Throw fetch failure exception when unable to deserialize broadcasted map statuses
### What changes were proposed in this pull request? This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. ### Why are the changes needed? One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes #32033 from viirya/fix-broadcast-master. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ebf01ec commit 571acc8

File tree

2 files changed

+64
-10
lines changed

2 files changed

+64
-10
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark
1919

20-
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
20+
import java.io.{ByteArrayInputStream, IOException, ObjectInputStream, ObjectOutputStream}
2121
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
2222
import java.util.concurrent.locks.ReentrantReadWriteLock
2323

@@ -100,7 +100,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
100100
* broadcast variable in order to keep it from being garbage collected and to allow for it to be
101101
* explicitly destroyed later on when the ShuffleMapStage is garbage-collected.
102102
*/
103-
private[this] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
103+
private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
104104

105105
/**
106106
* Counter tracking the number of partitions that have output. This is a performance optimization
@@ -843,7 +843,14 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
843843
if (fetchedStatuses == null) {
844844
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
845845
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
846-
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
846+
try {
847+
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
848+
} catch {
849+
case e: SparkException =>
850+
throw new MetadataFetchFailedException(shuffleId, -1,
851+
s"Unable to deserialize broadcasted map statuses for shuffle $shuffleId: " +
852+
e.getCause)
853+
}
847854
logInfo("Got the output locations")
848855
mapStatuses.put(shuffleId, fetchedStatuses)
849856
}
@@ -953,13 +960,19 @@ private[spark] object MapOutputTracker extends Logging {
953960
case DIRECT =>
954961
deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]]
955962
case BROADCAST =>
956-
// deserialize the Broadcast, pull .value array out of it, and then deserialize that
957-
val bcast = deserializeObject(bytes, 1, bytes.length - 1).
958-
asInstanceOf[Broadcast[Array[Byte]]]
959-
logInfo("Broadcast mapstatuses size = " + bytes.length +
960-
", actual size = " + bcast.value.length)
961-
// Important - ignore the DIRECT tag ! Start from offset 1
962-
deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
963+
try {
964+
// deserialize the Broadcast, pull .value array out of it, and then deserialize that
965+
val bcast = deserializeObject(bytes, 1, bytes.length - 1).
966+
asInstanceOf[Broadcast[Array[Byte]]]
967+
logInfo("Broadcast mapstatuses size = " + bytes.length +
968+
", actual size = " + bcast.value.length)
969+
// Important - ignore the DIRECT tag ! Start from offset 1
970+
deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
971+
} catch {
972+
case e: IOException =>
973+
logWarning("Exception encountered during deserializing broadcasted map statuses: ", e)
974+
throw new SparkException("Unable to deserialize broadcasted map statuses", e)
975+
}
963976
case _ => throw new IllegalArgumentException("Unexpected byte tag = " + bytes(0))
964977
}
965978
}

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,4 +332,45 @@ class MapOutputTrackerSuite extends SparkFunSuite {
332332
rpcEnv.shutdown()
333333
}
334334

335+
test("SPARK-34939: remote fetch using broadcast if broadcasted value is destroyed") {
336+
val newConf = new SparkConf
337+
newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
338+
newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
339+
newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 1MiB framesize
340+
341+
// needs TorrentBroadcast so need a SparkContext
342+
withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
343+
val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
344+
val rpcEnv = sc.env.rpcEnv
345+
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
346+
rpcEnv.stop(masterTracker.trackerEndpoint)
347+
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
348+
349+
masterTracker.registerShuffle(20, 100)
350+
(0 until 100).foreach { i =>
351+
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
352+
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5))
353+
}
354+
355+
val mapWorkerRpcEnv = createRpcEnv("spark-worker", "localhost", 0, new SecurityManager(conf))
356+
val mapWorkerTracker = new MapOutputTrackerWorker(conf)
357+
mapWorkerTracker.trackerEndpoint =
358+
mapWorkerRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
359+
360+
val fetchedBytes = mapWorkerTracker.trackerEndpoint
361+
.askSync[Array[Byte]](GetMapOutputStatuses(20))
362+
assert(fetchedBytes(0) == 1)
363+
364+
// Normally `unregisterMapOutput` triggers the destroy of broadcasted value.
365+
// But the timing of destroying broadcasted value is indeterminate, we manually destroy
366+
// it by blocking.
367+
masterTracker.shuffleStatuses.get(20).foreach { shuffleStatus =>
368+
shuffleStatus.cachedSerializedBroadcast.destroy(true)
369+
}
370+
val err = intercept[SparkException] {
371+
MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
372+
}
373+
assert(err.getMessage.contains("Unable to deserialize broadcasted map statuses"))
374+
}
375+
}
335376
}

0 commit comments

Comments
 (0)