Skip to content

Commit 7fa960e

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe
Removing elements from a mutable HashSet while iterating over it can cause the iteration to incorrectly skip over entries that were not removed. If this happened, PythonRDD would write fewer broadcast variables than the Python worker was expecting to read, which would cause the Python worker to hang indefinitely. Author: Davies Liu <[email protected]> Closes #4776 from davies/fix_hang and squashes the following commits: a4384a5 [Davies Liu] fix bug: remvoe() inside iterator is not safe
1 parent cfff397 commit 7fa960e

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,13 @@ private[spark] class PythonRDD(
219219
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
220220
val newBids = broadcastVars.map(_.id).toSet
221221
// number of different broadcasts
222-
val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
222+
val toRemove = oldBids.diff(newBids)
223+
val cnt = toRemove.size + newBids.diff(oldBids).size
223224
dataOut.writeInt(cnt)
224-
for (bid <- oldBids) {
225-
if (!newBids.contains(bid)) {
226-
// remove the broadcast from worker
227-
dataOut.writeLong(- bid - 1) // bid >= 0
228-
oldBids.remove(bid)
229-
}
225+
for (bid <- toRemove) {
226+
// remove the broadcast from worker
227+
dataOut.writeLong(- bid - 1) // bid >= 0
228+
oldBids.remove(bid)
230229
}
231230
for (broadcast <- broadcastVars) {
232231
if (!oldBids.contains(broadcast.id)) {

0 commit comments

Comments
 (0)