Skip to content

Commit 5e92301

Browse files
HeartSaVioRdongjoon-hyun
authored andcommitted
[SPARK-29161][CORE][SQL][STREAMING] Unify default wait time for waitUntilEmpty
### What changes were proposed in this pull request? This is a follow-up of the [review comment](#25706 (comment)). This patch unifies the default wait time to be 10 seconds as it would fit most of UTs (as they have smaller timeouts) and doesn't bring additional latency since it will return if the condition is met. This patch doesn't touch the one which waits 100000 milliseconds (100 seconds), to not break anything unintentionally, though I'd rather questionable that we really need to wait for 100 seconds. ### Why are the changes needed? It simplifies the test code and get rid of various heuristic values on timeout. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? CI build will test the patch, as it would be the best environment to test the patch (builds are running there). Closes #25837 from HeartSaVioR/MINOR-unify-default-wait-time-for-wait-until-empty. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent bd05339 commit 5e92301

26 files changed

+82
-80
lines changed

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private[spark] object TestUtils {
264264
try {
265265
body(listener)
266266
} finally {
267-
sc.listenerBus.waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
267+
sc.listenerBus.waitUntilEmpty()
268268
sc.listenerBus.removeListener(listener)
269269
}
270270
}

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,17 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
186186
metricsSystem.registerSource(metrics)
187187
}
188188

189+
/**
190+
* For testing only. Wait until there are no more events in the queue, or until the default
191+
* wait time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
192+
* emptied.
193+
* Exposed for testing.
194+
*/
195+
@throws(classOf[TimeoutException])
196+
private[spark] def waitUntilEmpty(): Unit = {
197+
waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
198+
}
199+
189200
/**
190201
* For testing only. Wait until there are no more events in the queue, or until the specified
191202
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private[spark] object AccumulatorSuite {
126126
sc.addSparkListener(listener)
127127
testBody
128128
// wait until all events have been processed before proceeding to assert things
129-
sc.listenerBus.waitUntilEmpty(10 * 1000)
129+
sc.listenerBus.waitUntilEmpty()
130130
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
131131
val isSet = accums.exists { a =>
132132
a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
6464

6565
private def post(event: SparkListenerEvent): Unit = {
6666
listenerBus.post(event)
67-
listenerBus.waitUntilEmpty(1000)
67+
listenerBus.waitUntilEmpty()
6868
}
6969

7070
test("initialize dynamic allocation in SparkContext") {

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ object ShuffleSuite {
498498

499499
job
500500

501-
sc.listenerBus.waitUntilEmpty(500)
501+
sc.listenerBus.waitUntilEmpty()
502502
AggregatedShuffleMetrics(recordsWritten, recordsRead, bytesWritten, bytesRead)
503503
}
504504
}

core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext {
6363
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
6464
assert(sc.getRDDStorageInfo.length === 0)
6565
rdd.collect()
66-
sc.listenerBus.waitUntilEmpty(10000)
66+
sc.listenerBus.waitUntilEmpty()
6767
eventually(timeout(10.seconds), interval(100.milliseconds)) {
6868
assert(sc.getRDDStorageInfo.length === 1)
6969
}

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
716716
}
717717
x
718718
}.collect()
719-
sc.listenerBus.waitUntilEmpty(10000)
719+
sc.listenerBus.waitUntilEmpty()
720720
// As executors will send the metrics of running tasks via heartbeat, we can use this to check
721721
// whether there is any running task.
722722
eventually(timeout(10.seconds)) {

core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ import org.apache.spark.util.SparkConfWithEnv
2929

3030
class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
3131

32-
/** Length of time to wait while draining listener events. */
33-
private val WAIT_TIMEOUT_MILLIS = 10000
34-
3532
test("verify that correct log urls get propagated from workers") {
3633
sc = new SparkContext("local-cluster[2,1,1024]", "test")
3734

@@ -41,7 +38,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
4138
// Trigger a job so that executors get added
4239
sc.parallelize(1 to 100, 4).map(_.toString).count()
4340

44-
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
41+
sc.listenerBus.waitUntilEmpty()
4542
listener.addedExecutorInfos.values.foreach { info =>
4643
assert(info.logUrlMap.nonEmpty)
4744
// Browse to each URL to check that it's valid
@@ -61,7 +58,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
6158
// Trigger a job so that executors get added
6259
sc.parallelize(1 to 100, 4).map(_.toString).count()
6360

64-
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
61+
sc.listenerBus.waitUntilEmpty()
6562
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
6663
assert(listeners.size === 1)
6764
val listener = listeners(0)

core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
182182
.reduceByKey(_ + _)
183183
.saveAsTextFile(tmpFile.toURI.toString)
184184

185-
sc.listenerBus.waitUntilEmpty(500)
185+
sc.listenerBus.waitUntilEmpty()
186186
assert(inputRead == numRecords)
187187

188188
assert(outputWritten == numBuckets)
@@ -243,7 +243,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
243243
val taskMetrics = new ArrayBuffer[Long]()
244244

245245
// Avoid receiving earlier taskEnd events
246-
sc.listenerBus.waitUntilEmpty(500)
246+
sc.listenerBus.waitUntilEmpty()
247247

248248
sc.addSparkListener(new SparkListener() {
249249
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -253,7 +253,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
253253

254254
job
255255

256-
sc.listenerBus.waitUntilEmpty(500)
256+
sc.listenerBus.waitUntilEmpty()
257257
taskMetrics.sum
258258
}
259259

@@ -293,7 +293,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
293293

294294
try {
295295
rdd.saveAsTextFile(outPath.toString)
296-
sc.listenerBus.waitUntilEmpty(500)
296+
sc.listenerBus.waitUntilEmpty()
297297
assert(taskBytesWritten.length == 2)
298298
val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
299299
taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
172172
override def applicationAttemptId(): Option[String] = None
173173
}
174174

175-
/** Length of time to wait while draining listener events. */
176-
val WAIT_TIMEOUT_MILLIS = 10000
177-
178175
/**
179176
* Listeners which records some information to verify in UTs. Getter-kind methods in this class
180177
* ensures the value is returned after ensuring there's no event to process, as well as the
@@ -230,7 +227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
230227
_endedTasks.toSet
231228
}
232229

233-
private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
230+
private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty()
234231
}
235232

236233
var sparkListener: EventInfoRecordingListener = null
@@ -839,7 +836,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
839836
val testRdd = new MyRDD(sc, 0, Nil)
840837
val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty,
841838
resultHandler, properties)
842-
sc.listenerBus.waitUntilEmpty(1000L)
839+
sc.listenerBus.waitUntilEmpty()
843840
assert(assertionError.get() === null)
844841
}
845842

@@ -957,7 +954,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
957954
completeNextResultStageWithSuccess(1, 1)
958955

959956
// Confirm job finished successfully
960-
sc.listenerBus.waitUntilEmpty(1000)
957+
sc.listenerBus.waitUntilEmpty()
961958
assert(ended)
962959
assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
963960
assertDataStructuresEmpty()
@@ -994,7 +991,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
994991
} else {
995992
// Stage should have been aborted and removed from running stages
996993
assertDataStructuresEmpty()
997-
sc.listenerBus.waitUntilEmpty(1000)
994+
sc.listenerBus.waitUntilEmpty()
998995
assert(ended)
999996
jobResult match {
1000997
case JobFailed(reason) =>
@@ -1116,7 +1113,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
11161113
completeNextResultStageWithSuccess(2, 1)
11171114

11181115
assertDataStructuresEmpty()
1119-
sc.listenerBus.waitUntilEmpty(1000)
1116+
sc.listenerBus.waitUntilEmpty()
11201117
assert(ended)
11211118
assert(results === Map(0 -> 42))
11221119
}
@@ -1175,7 +1172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
11751172
// Complete the result stage.
11761173
completeNextResultStageWithSuccess(1, 1)
11771174

1178-
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1175+
sc.listenerBus.waitUntilEmpty()
11791176
assertDataStructuresEmpty()
11801177
}
11811178

@@ -1204,7 +1201,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12041201
// Complete the result stage.
12051202
completeNextResultStageWithSuccess(1, 0)
12061203

1207-
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1204+
sc.listenerBus.waitUntilEmpty()
12081205
assertDataStructuresEmpty()
12091206
}
12101207

@@ -1230,7 +1227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12301227
null))
12311228

12321229
// Assert the stage has been cancelled.
1233-
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1230+
sc.listenerBus.waitUntilEmpty()
12341231
assert(failure.getMessage.startsWith("Job aborted due to stage failure: Could not recover " +
12351232
"from a failed barrier ResultStage."))
12361233
}
@@ -2668,7 +2665,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26682665
sc.parallelize(1 to tasks, tasks).foreach { _ =>
26692666
accum.add(1L)
26702667
}
2671-
sc.listenerBus.waitUntilEmpty(1000)
2668+
sc.listenerBus.waitUntilEmpty()
26722669
assert(foundCount.get() === tasks)
26732670
}
26742671
}

0 commit comments

Comments
 (0)