Skip to content

Commit 905b7f7

Browse files
HeartSaVioRsrowen
authored andcommitted
[SPARK-28967][CORE] Include cloned version of "properties" to avoid ConcurrentModificationException
### What changes were proposed in this pull request? This patch fixes the bug which throws ConcurrentModificationException when job with 0 partition is submitted via DAGScheduler. ### Why are the changes needed? Without this patch, structured streaming query throws ConcurrentModificationException, like below stack trace: ``` 19/09/04 09:48:49 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception java.util.ConcurrentModificationException at java.util.Hashtable$Enumerator.next(Hashtable.java:1387) at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424) at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.JsonProtocol$.mapToJson(JsonProtocol.scala:514) at org.apache.spark.util.JsonProtocol$.$anonfun$propertiesToJson$1(JsonProtocol.scala:520) at scala.Option.map(Option.scala:163) at org.apache.spark.util.JsonProtocol$.propertiesToJson(JsonProtocol.scala:519) at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:155) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:79) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:149) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:217) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:99) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:84) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:102) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:102) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:97) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:93) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:93) ``` Please refer https://issues.apache.org/jira/browse/SPARK-28967 for detailed reproducer. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Newly added UT. Also manually tested via running simple structured streaming query in spark-shell. Closes #25672 from HeartSaVioR/SPARK-28967. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 4664a08 commit 905b7f7

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,7 @@ private[spark] class DAGScheduler(
698698
if (partitions.isEmpty) {
699699
val time = clock.getTimeMillis()
700700
listenerBus.post(
701-
SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
701+
SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties)))
702702
listenerBus.post(
703703
SparkListenerJobEnd(jobId, time, JobSucceeded))
704704
// Return immediately if the job is running 0 tasks

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package org.apache.spark.scheduler
1919

2020
import java.util.Properties
2121
import java.util.concurrent.{CountDownLatch, TimeUnit}
22-
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
22+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference}
2323

2424
import scala.annotation.meta.param
2525
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
2626
import scala.util.control.NonFatal
2727

2828
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
29+
import org.scalatest.exceptions.TestFailedException
2930
import org.scalatest.time.SpanSugar._
3031

3132
import org.apache.spark._
@@ -36,7 +37,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD}
3637
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3738
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
3839
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
39-
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
40+
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, ThreadUtils, Utils}
4041

4142
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
4243
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -788,6 +789,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
788789
}
789790
}
790791

792+
test("SPARK-28967 properties must be cloned before posting to listener bus for 0 partition") {
793+
val properties = new Properties()
794+
val func = (context: TaskContext, it: Iterator[(_)]) => 1
795+
val resultHandler = (taskIndex: Int, result: Int) => {}
796+
val assertionError = new AtomicReference[TestFailedException](
797+
new TestFailedException("Listener didn't receive expected JobStart event", 0))
798+
val listener = new SparkListener() {
799+
override def onJobStart(event: SparkListenerJobStart): Unit = {
800+
try {
801+
assert(event.properties.equals(properties), "Expected same content of properties, " +
802+
s"but got properties with different content. props in caller ${properties} /" +
803+
s" props in event ${event.properties}")
804+
assert(event.properties.ne(properties), "Expected instance with different identity, " +
805+
"but got same instance.")
806+
assertionError.set(null)
807+
} catch {
808+
case e: TestFailedException => assertionError.set(e)
809+
}
810+
}
811+
}
812+
sc.addSparkListener(listener)
813+
814+
// 0 partition
815+
val testRdd = new MyRDD(sc, 0, Nil)
816+
val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty,
817+
resultHandler, properties)
818+
sc.listenerBus.waitUntilEmpty(1000L)
819+
assert(assertionError.get() === null)
820+
}
821+
791822
// Helper function to validate state when creating tests for task failures
792823
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
793824
assert(stageAttempt.stageId === stageId)

0 commit comments

Comments
 (0)