From 75dd86553772ca840312e4225be44340f5830f81 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 4 Sep 2019 11:42:54 +0900 Subject: [PATCH] [SPARK-28967][CORE] Include cloned version of "properties" to avoid ConcurrentModificationException --- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 35 +++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b08483267c141..9df59459ca799 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -698,7 +698,7 @@ private[spark] class DAGScheduler( if (partitions.isEmpty) { val time = clock.getTimeMillis() listenerBus.post( - SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties)) + SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties))) listenerBus.post( SparkListenerJobEnd(jobId, time, JobSucceeded)) // Return immediately if the job is running 0 tasks diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2b3423f9a4d40..cd854c379b08a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -19,13 +19,14 @@ package org.apache.spark.scheduler import java.util.Properties import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference} import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.util.control.NonFatal import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} +import org.scalatest.exceptions.TestFailedException import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -36,7 +37,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, ThreadUtils, Utils} class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -788,6 +789,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + test("SPARK-28967 properties must be cloned before posting to listener bus for 0 partition") { + val properties = new Properties() + val func = (context: TaskContext, it: Iterator[(_)]) => 1 + val resultHandler = (taskIndex: Int, result: Int) => {} + val assertionError = new AtomicReference[TestFailedException]( + new TestFailedException("Listener didn't receive expected JobStart event", 0)) + val listener = new SparkListener() { + override def onJobStart(event: SparkListenerJobStart): Unit = { + try { + assert(event.properties.equals(properties), "Expected same content of properties, " + + s"but got properties with different content. props in caller ${properties} /" + + s" props in event ${event.properties}") + assert(event.properties.ne(properties), "Expected instance with different identity, " + + "but got same instance.") + assertionError.set(null) + } catch { + case e: TestFailedException => assertionError.set(e) + } + } + } + sc.addSparkListener(listener) + + // 0 partition + val testRdd = new MyRDD(sc, 0, Nil) + val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty, + resultHandler, properties) + sc.listenerBus.waitUntilEmpty(1000L) + assert(assertionError.get() === null) + } + // Helper function to validate state when creating tests for task failures private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { assert(stageAttempt.stageId === stageId)