From dea3952eca12fd1abe90d7b2274989e95bf082b6 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 19 Sep 2019 07:30:50 +0900 Subject: [PATCH 1/4] [MINOR][CORE][SQL][STREAMING] Unify default wait time for waitUntilEmpty --- .../scala/org/apache/spark/TestUtils.scala | 2 +- .../spark/scheduler/LiveListenerBus.scala | 11 ++++++++++ .../org/apache/spark/AccumulatorSuite.scala | 2 +- .../ExecutorAllocationManagerSuite.scala | 2 +- .../scala/org/apache/spark/ShuffleSuite.scala | 2 +- .../apache/spark/SparkContextInfoSuite.scala | 2 +- .../org/apache/spark/SparkContextSuite.scala | 2 +- .../spark/deploy/LogUrlsStandaloneSuite.scala | 7 ++----- .../metrics/InputOutputMetricsSuite.scala | 8 +++---- .../CoarseGrainedSchedulerBackendSuite.scala | 6 ++---- .../spark/scheduler/DAGSchedulerSuite.scala | 21 ++++++++----------- .../spark/scheduler/SparkListenerSuite.scala | 9 +++----- .../apache/spark/sql/CachedTableSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- .../apache/spark/sql/SessionStateSuite.scala | 6 +++--- .../scala/org/apache/spark/sql/UDFSuite.scala | 6 +++--- .../FileDataSourceV2FallBackSuite.scala | 2 +- .../execution/metric/SQLMetricsSuite.scala | 2 +- .../ui/SQLAppStatusListenerSuite.scala | 4 ++-- .../StreamingQueryListenerSuite.scala | 6 +++--- .../StreamingQueryListenersConfSuite.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 10 ++++----- .../sql/util/DataFrameCallbackSuite.scala | 18 ++++++++-------- .../util/ExecutionListenerManagerSuite.scala | 4 ++-- .../streaming/StreamingListenerSuite.scala | 2 +- 27 files changed, 73 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 41ae3ae3b758a..9aeb128fcc491 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -264,7 +264,7 @@ private[spark] object TestUtils { try { body(listener) } finally { - sc.listenerBus.waitUntilEmpty(TimeUnit.SECONDS.toMillis(10)) + sc.listenerBus.waitUntilEmpty() sc.listenerBus.removeListener(listener) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 302ebd30da228..fc8f548bd851d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -186,6 +186,17 @@ private[spark] class LiveListenerBus(conf: SparkConf) { metricsSystem.registerSource(metrics) } + /** + * For testing only. Wait until there are no more events in the queue, or until the default + * wait time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue + * emptied. + * Exposed for testing. + */ + @throws(classOf[TimeoutException]) + def waitUntilEmpty(): Unit = { + waitUntilEmpty(TimeUnit.SECONDS.toMillis(10)) + } + /** * For testing only. Wait until there are no more events in the queue, or until the specified * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 435665d8a1ce2..a75cf3f0381df 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -126,7 +126,7 @@ private[spark] object AccumulatorSuite { sc.addSparkListener(listener) testBody // wait until all events have been processed before proceeding to assert things - sc.listenerBus.waitUntilEmpty(10 * 1000) + sc.listenerBus.waitUntilEmpty() val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values) val isSet = accums.exists { a => a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 07fb323cfc355..460714f204a3a 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -64,7 +64,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def post(event: SparkListenerEvent): Unit = { listenerBus.post(event) - listenerBus.waitUntilEmpty(1000) + listenerBus.waitUntilEmpty() } test("initialize dynamic allocation in SparkContext") { diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 923c9c90447fd..4c9d7e9faeb2a 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -498,7 +498,7 @@ object ShuffleSuite { job - sc.listenerBus.waitUntilEmpty(500) + sc.listenerBus.waitUntilEmpty() AggregatedShuffleMetrics(recordsWritten, recordsRead, bytesWritten, bytesRead) } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 536b4aec75623..09510edcb4499 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -63,7 +63,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() assert(sc.getRDDStorageInfo.length === 0) rdd.collect() - sc.listenerBus.waitUntilEmpty(10000) + sc.listenerBus.waitUntilEmpty() eventually(timeout(10.seconds), interval(100.milliseconds)) { assert(sc.getRDDStorageInfo.length === 1) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 786f55c96a3e8..eca49e5efdbbb 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -716,7 +716,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } x }.collect() - sc.listenerBus.waitUntilEmpty(10000) + sc.listenerBus.waitUntilEmpty() // As executors will send the metrics of running tasks via heartbeat, we can use this to check // whether there is any running task. eventually(timeout(10.seconds)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index cbdf1755b0c5b..d953211c13b1d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -29,9 +29,6 @@ import org.apache.spark.util.SparkConfWithEnv class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { - /** Length of time to wait while draining listener events. */ - private val WAIT_TIMEOUT_MILLIS = 10000 - test("verify that correct log urls get propagated from workers") { sc = new SparkContext("local-cluster[2,1,1024]", "test") @@ -41,7 +38,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { // Trigger a job so that executors get added sc.parallelize(1 to 100, 4).map(_.toString).count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) // Browse to each URL to check that it's valid @@ -61,7 +58,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { // Trigger a job so that executors get added sc.parallelize(1 to 100, 4).map(_.toString).count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] assert(listeners.size === 1) val listener = listeners(0) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index c7bd0c905d027..f9511045ee89e 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -182,7 +182,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext .reduceByKey(_ + _) .saveAsTextFile(tmpFile.toURI.toString) - sc.listenerBus.waitUntilEmpty(500) + sc.listenerBus.waitUntilEmpty() assert(inputRead == numRecords) assert(outputWritten == numBuckets) @@ -243,7 +243,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext val taskMetrics = new ArrayBuffer[Long]() // Avoid receiving earlier taskEnd events - sc.listenerBus.waitUntilEmpty(500) + sc.listenerBus.waitUntilEmpty() sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { @@ -253,7 +253,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext job - sc.listenerBus.waitUntilEmpty(500) + sc.listenerBus.waitUntilEmpty() taskMetrics.sum } @@ -293,7 +293,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext try { rdd.saveAsTextFile(outPath.toString) - sc.listenerBus.waitUntilEmpty(500) + sc.listenerBus.waitUntilEmpty() assert(taskBytesWritten.length == 2) val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS") taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) => diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 3edbbeb9c08f1..f1fa819810bf3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -46,8 +46,6 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, Utils} class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with Eventually { - private val executorUpTimeout = 1.minute - test("serialized task larger than max RPC message size") { val conf = new SparkConf conf.set(RPC_MESSAGE_MAX_SIZE, 1) @@ -180,7 +178,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo backend.driverEndpoint.askSync[Boolean]( RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) - sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) + sc.listenerBus.waitUntilEmpty() assert(executorAddedCount === 3) } @@ -251,7 +249,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3")) assert(execResources(GPU).assignedAddrs.isEmpty) } - sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) + sc.listenerBus.waitUntilEmpty() assert(executorAddedCount === 3) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7cb7eceec615b..b86d24deb15fc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -172,9 +172,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def applicationAttemptId(): Option[String] = None } - /** Length of time to wait while draining listener events. */ - val WAIT_TIMEOUT_MILLIS = 10000 - /** * Listeners which records some information to verify in UTs. Getter-kind methods in this class * ensures the value is returned after ensuring there's no event to process, as well as the @@ -230,7 +227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi _endedTasks.toSet } - private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty() } var sparkListener: EventInfoRecordingListener = null @@ -839,7 +836,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val testRdd = new MyRDD(sc, 0, Nil) val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty, resultHandler, properties) - sc.listenerBus.waitUntilEmpty(1000L) + sc.listenerBus.waitUntilEmpty() assert(assertionError.get() === null) } @@ -957,7 +954,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeNextResultStageWithSuccess(1, 1) // Confirm job finished successfully - sc.listenerBus.waitUntilEmpty(1000) + sc.listenerBus.waitUntilEmpty() assert(ended) assert(results === (0 until parts).map { idx => idx -> 42 }.toMap) assertDataStructuresEmpty() @@ -994,7 +991,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } else { // Stage should have been aborted and removed from running stages assertDataStructuresEmpty() - sc.listenerBus.waitUntilEmpty(1000) + sc.listenerBus.waitUntilEmpty() assert(ended) jobResult match { case JobFailed(reason) => @@ -1116,7 +1113,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeNextResultStageWithSuccess(2, 1) assertDataStructuresEmpty() - sc.listenerBus.waitUntilEmpty(1000) + sc.listenerBus.waitUntilEmpty() assert(ended) assert(results === Map(0 -> 42)) } @@ -1175,7 +1172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Complete the result stage. completeNextResultStageWithSuccess(1, 1) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() assertDataStructuresEmpty() } @@ -1204,7 +1201,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Complete the result stage. completeNextResultStageWithSuccess(1, 0) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() assertDataStructuresEmpty() } @@ -1230,7 +1227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi null)) // Assert the stage has been cancelled. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() assert(failure.getMessage.startsWith("Job aborted due to stage failure: Could not recover " + "from a failed barrier ResultStage.")) } @@ -2668,7 +2665,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi sc.parallelize(1 to tasks, tasks).foreach { _ => accum.add(1L) } - sc.listenerBus.waitUntilEmpty(1000) + sc.listenerBus.waitUntilEmpty() assert(foundCount.get() === tasks) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 8903e1054f53d..890abca1846fb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -38,9 +38,6 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match import LiveListenerBus._ - /** Length of time to wait while draining listener events. */ - val WAIT_TIMEOUT_MILLIS = 10000 - val jobCompletionTime = 1421191296660L private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext]) @@ -65,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match sc.listenerBus.addToSharedQueue(listener) sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() sc.stop() assert(listener.sparkExSeen) @@ -97,7 +94,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Starting listener bus should flush all buffered events bus.start(mockSparkContext, mockMetricsSystem) Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + bus.waitUntilEmpty() assert(counter.count === 5) assert(sharedQueueSize(bus) === 0) assert(eventProcessingTimeCount(bus) === 5) @@ -223,7 +220,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match rdd2.setName("Target RDD") rdd2.count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be {1} val (stageInfo, taskInfoMetrics) = listener.stageInfos.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 6e1ee6da9200d..edafdaa54945a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -849,7 +849,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSparkSessi sparkContext.addSparkListener(jobListener) try { val result = f - sparkContext.listenerBus.waitUntilEmpty(10000L) + sparkContext.listenerBus.waitUntilEmpty() assert(numJobTrigered === 0) result } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b4ddfecaee469..3d069b2f118bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2114,7 +2114,7 @@ class DataFrameSuite extends QueryTest with SharedSparkSession { val df = spark.read.json(path.getCanonicalPath) assert(df.columns === Array("i", "p")) - spark.sparkContext.listenerBus.waitUntilEmpty(10000) + spark.sparkContext.listenerBus.waitUntilEmpty() assert(numJobs == 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 23848d90dc53d..e4c10d1990f25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -488,7 +488,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { sparkContext.addSparkListener(bytesReadListener) try { spark.read.csv(path).limit(1).collect() - sparkContext.listenerBus.waitUntilEmpty(1000L) + sparkContext.listenerBus.waitUntilEmpty() assert(bytesReads.sum === 7860) } finally { sparkContext.removeSparkListener(bytesReadListener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 80c1e24bfa568..28a027690db04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2609,14 +2609,14 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { } // Make sure no spurious job starts are pending in the listener bus. - sparkContext.listenerBus.waitUntilEmpty(500) + sparkContext.listenerBus.waitUntilEmpty() sparkContext.addSparkListener(listener) try { // Execute the command. sql("show databases").head() // Make sure we have seen all events triggered by DataFrame.show() - sparkContext.listenerBus.waitUntilEmpty(500) + sparkContext.listenerBus.waitUntilEmpty() } finally { sparkContext.removeSparkListener(listener) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala index 1d461a03fd1f6..31957a99e15af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala @@ -158,7 +158,7 @@ class SessionStateSuite extends SparkFunSuite { assert(forkedSession ne activeSession) assert(forkedSession.listenerManager ne activeSession.listenerManager) runCollectQueryOn(forkedSession) - activeSession.sparkContext.listenerBus.waitUntilEmpty(1000) + activeSession.sparkContext.listenerBus.waitUntilEmpty() assert(collectorA.commands.length == 1) // forked should callback to A assert(collectorA.commands(0) == "collect") @@ -166,14 +166,14 @@ class SessionStateSuite extends SparkFunSuite { // => changes to forked do not affect original forkedSession.listenerManager.register(collectorB) runCollectQueryOn(activeSession) - activeSession.sparkContext.listenerBus.waitUntilEmpty(1000) + activeSession.sparkContext.listenerBus.waitUntilEmpty() assert(collectorB.commands.isEmpty) // original should not callback to B assert(collectorA.commands.length == 2) // original should still callback to A assert(collectorA.commands(1) == "collect") // <= changes to original do not affect forked activeSession.listenerManager.register(collectorC) runCollectQueryOn(forkedSession) - activeSession.sparkContext.listenerBus.waitUntilEmpty(1000) + activeSession.sparkContext.listenerBus.waitUntilEmpty() assert(collectorC.commands.isEmpty) // forked should not callback to C assert(collectorA.commands.length == 3) // forked should still callback to A assert(collectorB.commands.length == 1) // forked should still callback to B diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 2a034bcdc3f00..268fb65144e72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -360,13 +360,13 @@ class UDFSuite extends QueryTest with SharedSparkSession { .withColumn("b", udf1($"a", lit(10))) df.cache() df.write.saveAsTable("t") - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(numTotalCachedHit == 1, "expected to be cached in saveAsTable") df.write.insertInto("t") - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(numTotalCachedHit == 2, "expected to be cached in insertInto") df.write.save(path.getCanonicalPath) - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(numTotalCachedHit == 3, "expected to be cached in save for native") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index cd804adfa2133..2b3340527a4e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -173,7 +173,7 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { withTempPath { path => val inputData = spark.range(10) inputData.write.format(format).save(path.getCanonicalPath) - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(commands.length == 1) assert(commands.head._1 == "save") assert(commands.head._2.isInstanceOf[InsertIntoHadoopFsRelationCommand]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e7f1c42d7d7c5..ccefb53fc4b39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -388,7 +388,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { // Assume the execution plan is // PhysicalRDD(nodeId = 0) data.write.format("json").save(file.getAbsolutePath) - sparkContext.listenerBus.waitUntilEmpty(10000) + sparkContext.listenerBus.waitUntilEmpty() val executionIds = currentExecutionIds().diff(previousExecutionIds) assert(executionIds.size === 1) val executionId = executionIds.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 90966d2efec23..88864ccec7523 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -480,7 +480,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils // At the beginning of this test case, there should be no live data in the listener. assert(listener.noLiveData()) spark.sparkContext.parallelize(1 to 10).foreach(i => ()) - spark.sparkContext.listenerBus.waitUntilEmpty(10000) + spark.sparkContext.listenerBus.waitUntilEmpty() // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL // execution ends, which will not be triggered for non-SQL jobs. assert(listener.noLiveData()) @@ -673,7 +673,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { case e: SparkException => // This is expected for a failed job } } - sc.listenerBus.waitUntilEmpty(10000) + sc.listenerBus.waitUntilEmpty() val statusStore = spark.sharedState.statusStore assert(statusStore.executionsCount() <= 50) assert(statusStore.planGraphCount() <= 50) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index d84dc62a696b9..4c58cb85c4d36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -49,7 +49,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(spark.streams.active.isEmpty) assert(spark.streams.listListeners().isEmpty) // Make sure we don't leak any events to the next test - spark.sparkContext.listenerBus.waitUntilEmpty(10000) + spark.sparkContext.listenerBus.waitUntilEmpty() } testQuietly("single listener, check trigger events are generated correctly") { @@ -320,7 +320,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { q.recentProgress.size > 1 && q.recentProgress.size <= 11 } testStream(input.toDS)(actions: _*) - spark.sparkContext.listenerBus.waitUntilEmpty(10000) + spark.sparkContext.listenerBus.waitUntilEmpty() // 11 is the max value of the possible numbers of events. assert(numProgressEvent > 1 && numProgressEvent <= 11) } finally { @@ -343,7 +343,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AddData(mem, 1, 2, 3), CheckAnswer(1, 2, 3) ) - session.sparkContext.listenerBus.waitUntilEmpty(5000) + session.sparkContext.listenerBus.waitUntilEmpty() } def assertEventsCollected(collector: EventCollector): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala index 7801d968e901d..d538d93b845b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala @@ -39,7 +39,7 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { StopStream ) - spark.sparkContext.listenerBus.waitUntilEmpty(5000) + spark.sparkContext.listenerBus.waitUntilEmpty() assert(TestListener.queryStartedEvent != null) assert(TestListener.queryTerminatedEvent != null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index b98626a34cc29..01a03e484c90d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -277,7 +277,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with .format(classOf[NoopDataSource].getName) .mode(SaveMode.Append) .save() - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[AppendData]) // overwrite mode creates `OverwriteByExpression` @@ -285,21 +285,21 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with .format(classOf[NoopDataSource].getName) .mode(SaveMode.Overwrite) .save() - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[OverwriteByExpression]) // By default the save mode is `ErrorIfExists` for data source v2. spark.range(10).write .format(classOf[NoopDataSource].getName) .save() - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[AppendData]) spark.range(10).write .format(classOf[NoopDataSource].getName) .mode("default") .save() - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(plan.isInstanceOf[AppendData]) } finally { spark.listenerManager.unregister(listener) @@ -1058,7 +1058,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with checkDatasetUnorderly( spark.read.parquet(dir.getCanonicalPath).as[(Long, Long)], 0L -> 0L, 1L -> 1L, 2L -> 2L) - sparkContext.listenerBus.waitUntilEmpty(10000) + sparkContext.listenerBus.waitUntilEmpty() assert(jobDescriptions.asScala.toList.exists( _.contains("Listing leaf files and directories for 3 paths"))) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 18a82f0f84283..e96f05384c879 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -49,7 +49,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { df.select("i").collect() df.filter($"i" > 0).count() - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(metrics.length == 2) assert(metrics(0)._1 == "collect") @@ -80,7 +80,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { val e = intercept[SparkException](df.select(errorUdf($"i")).collect()) - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(metrics.length == 1) assert(metrics(0)._1 == "collect") assert(metrics(0)._2.analyzed.isInstanceOf[Project]) @@ -110,12 +110,12 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { df.collect() // Wait for the first `collect` to be caught by our listener. Otherwise the next `collect` will // reset the plan metrics. - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() df.collect() Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect() - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(metrics.length == 3) assert(metrics(0) === 1) assert(metrics(1) === 1) @@ -163,7 +163,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { // For this simple case, the peakExecutionMemory of a stage should be the data size of the // aggregate operator, as we only have one memory consuming operator per stage. - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(metrics.length == 2) assert(metrics(0) == topAggDataSize) assert(metrics(1) == bottomAggDataSize) @@ -187,7 +187,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { withTempPath { path => spark.range(10).write.format("json").save(path.getCanonicalPath) - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(commands.length == 1) assert(commands.head._1 == "save") assert(commands.head._2.isInstanceOf[InsertIntoHadoopFsRelationCommand]) @@ -198,7 +198,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { withTable("tab") { sql("CREATE TABLE tab(i long) using parquet") // adds commands(1) via onSuccess spark.range(10).write.insertInto("tab") - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(commands.length == 3) assert(commands(2)._1 == "insertInto") assert(commands(2)._2.isInstanceOf[InsertIntoStatement]) @@ -209,7 +209,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { withTable("tab") { spark.range(10).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("tab") - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(commands.length == 5) assert(commands(4)._1 == "saveAsTable") assert(commands(4)._2.isInstanceOf[CreateTable]) @@ -221,7 +221,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession { val e = intercept[AnalysisException] { spark.range(10).select($"id", $"id").write.insertInto("tab") } - sparkContext.listenerBus.waitUntilEmpty(1000) + sparkContext.listenerBus.waitUntilEmpty() assert(errors.length == 1) assert(errors.head._1 == "insertInto") assert(errors.head._2 == e) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala index 79819e7655414..2fd6cb220ea3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala @@ -34,13 +34,13 @@ class ExecutionListenerManagerSuite extends SparkFunSuite with LocalSparkSession spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate() spark.sql("select 1").collect() - spark.sparkContext.listenerBus.waitUntilEmpty(1000) + spark.sparkContext.listenerBus.waitUntilEmpty() assert(INSTANCE_COUNT.get() === 1) assert(CALLBACK_COUNT.get() === 1) val cloned = spark.cloneSession() cloned.sql("select 1").collect() - spark.sparkContext.listenerBus.waitUntilEmpty(1000) + spark.sparkContext.listenerBus.waitUntilEmpty() assert(INSTANCE_COUNT.get() === 1) assert(CALLBACK_COUNT.get() === 2) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 9cd5d8c0e95e1..01dbe75a9eae8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -227,7 +227,7 @@ class StreamingListenerSuite extends TestSuiteBase with LocalStreamingContext wi // Post a Streaming event after stopping StreamingContext val receiverInfoStopped = ReceiverInfo(0, "test", false, "localhost", "0") ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfoStopped)) - ssc.sparkContext.listenerBus.waitUntilEmpty(1000) + ssc.sparkContext.listenerBus.waitUntilEmpty() // The StreamingListener should not receive any event verifyNoMoreInteractions(streamingListener) } From 0c7f99027b0ce32d6b536d9eacfa73231d9d4258 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 19 Sep 2019 07:56:18 +0900 Subject: [PATCH 2/4] Fix compilation --- .../CoarseGrainedSchedulerBackendSuite.scala | 2 ++ .../spark/scheduler/SparkListenerSuite.scala | 22 +++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index f1fa819810bf3..7c6bee3a3488f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -46,6 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, Utils} class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with Eventually { + private val executorUpTimeout = 1.minute + test("serialized task larger than max RPC message size") { val conf = new SparkConf conf.set(RPC_MESSAGE_MAX_SIZE, 1) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 890abca1846fb..cda6fbad5419f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -245,7 +245,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match rdd3.setName("Trois") rdd1.count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be {1} val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD @@ -254,7 +254,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match listener.stageInfos.clear() rdd2.count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be {1} val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get stageInfo2.rddInfos.size should be {3} @@ -263,7 +263,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match listener.stageInfos.clear() rdd3.count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be {2} // Shuffle map stage + result stage val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get stageInfo3.rddInfos.size should be {1} // ShuffledRDD @@ -279,7 +279,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val rdd2 = rdd1.map(_.toString) sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be {1} val (stageInfo, _) = listener.stageInfos.head @@ -307,7 +307,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val numSlices = 16 val d = sc.parallelize(0 to 10000, numSlices).map(w) d.count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be (1) val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1") @@ -318,7 +318,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match d4.setName("A Cogroup") d4.collectAsMap() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() listener.stageInfos.size should be (4) listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) => /** @@ -369,7 +369,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match .reduce { case (x, y) => x } assert(result === 1.to(maxRpcMessageSize).toArray) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() val TASK_INDEX = 0 assert(listener.startedTasks.contains(TASK_INDEX)) assert(listener.startedGettingResultTasks.contains(TASK_INDEX)) @@ -385,7 +385,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x } assert(result === 2) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.waitUntilEmpty() val TASK_INDEX = 0 assert(listener.startedTasks.contains(TASK_INDEX)) assert(listener.startedGettingResultTasks.isEmpty) @@ -440,7 +440,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + bus.waitUntilEmpty() // The exception should be caught, and the event should be propagated to other listeners assert(jobCounter1.count === 5) @@ -510,7 +510,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // after we post one event, both interrupting listeners should get removed, and the // event log queue should be removed bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + bus.waitUntilEmpty() assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) assert(bus.findListenersByClass[BasicJobCounter]().size === 2) assert(bus.findListenersByClass[InterruptingListener]().size === 0) @@ -519,7 +519,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // posting more events should be fine, they'll just get processed from the OK queue. (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + bus.waitUntilEmpty() assert(counter1.count === 6) assert(counter2.count === 6) From 769c1ee7aaf030b27e7293b19d9b6be08f77ad5c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 19 Sep 2019 08:53:14 +0900 Subject: [PATCH 3/4] Reflect review comment --- .../main/scala/org/apache/spark/scheduler/LiveListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index fc8f548bd851d..bbbddd86cad39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -193,7 +193,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { * Exposed for testing. */ @throws(classOf[TimeoutException]) - def waitUntilEmpty(): Unit = { + private[spark] def waitUntilEmpty(): Unit = { waitUntilEmpty(TimeUnit.SECONDS.toMillis(10)) } From 8e661f7d66993e9921a2df72cf9c21573c7edfac Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 20 Sep 2019 14:03:06 +0900 Subject: [PATCH 4/4] Roll back the case which wait timeout is reduced (My bad) --- .../spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 7c6bee3a3488f..3edbbeb9c08f1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -180,7 +180,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo backend.driverEndpoint.askSync[Boolean]( RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) - sc.listenerBus.waitUntilEmpty() + sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) assert(executorAddedCount === 3) } @@ -251,7 +251,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3")) assert(execResources(GPU).assignedAddrs.isEmpty) } - sc.listenerBus.waitUntilEmpty() + sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) assert(executorAddedCount === 3) }