From 6cbf7e7a822ec795f8a91fc1fbd2998bd79a93b2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 21 Apr 2019 18:23:47 -0700 Subject: [PATCH 1/4] fix the flaky tests. --- .../scala/org/apache/spark/SparkContextInfoSuite.scala | 9 +++++++-- .../spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++-- .../apache/spark/deploy/yarn/BaseYarnClusterSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 4 ++-- .../sql/execution/ui/SQLAppStatusListenerSuite.scala | 3 ++- .../spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- .../org/apache/spark/sql/streaming/StreamTest.scala | 2 +- .../scala/org/apache/spark/streaming/ReceiverSuite.scala | 2 +- 8 files changed, 17 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index a57afdfeadf0..536b4aec7562 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark +import scala.concurrent.duration._ + import org.scalatest.Assertions +import org.scalatest.concurrent.Eventually._ import org.apache.spark.storage.StorageLevel @@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { test("getRDDStorageInfo only reports on RDDs that actually persist data") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - assert(sc.getRDDStorageInfo.size === 0) + assert(sc.getRDDStorageInfo.length === 0) rdd.collect() sc.listenerBus.waitUntilEmpty(10000) - assert(sc.getRDDStorageInfo.size === 1) + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(sc.getRDDStorageInfo.length === 1) + } assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 1a326a8af3b2..fdc1511c2ee3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -733,7 +733,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1.second), interval(10.milliseconds)) { + eventually(timeout(3.second), interval(10.milliseconds)) { provider.getConfig().keys should not contain ("HDFS State") } } finally { @@ -751,7 +751,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1.second), interval(10.milliseconds)) { + eventually(timeout(3.second), interval(10.milliseconds)) { verify(errorHandler).uncaughtException(any(), any()) } } finally { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 384a5f4e46b7..c7a99bf7f1cb 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -167,7 +167,7 @@ abstract class BaseYarnClusterSuite val handle = launcher.startApplication() try { - eventually(timeout(2.minutes), interval(1.second)) { + eventually(timeout(3.minutes), interval(1.second)) { assert(handle.getState().isFinal()) } } finally { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index b072202b7ce3..a594a8791448 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -205,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(30.seconds), interval(100.milliseconds)) { + eventually(timeout(180.seconds), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -213,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30.seconds), interval(100.milliseconds)) { + eventually(timeout(180.seconds), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index d845117b58c6..e3e5ddff9637 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -502,7 +502,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with } // Wait for listener to finish computing the metrics for the execution. - while (statusStore.executionsList().last.metricValues == null) { + while (statusStore.executionsList().isEmpty || + statusStore.executionsList().last.metricValues == null) { Thread.sleep(100) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9235c6d7c896..33b4c08080d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ - override val streamingTimeout = 20.seconds + override val streamingTimeout = 80.seconds /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ private def createFileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 89ce63635860..52b8e4732e80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -89,7 +89,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be protected val defaultUseV2Sink = false /** How long to wait for an active stream to catch up when checking a result. */ - val streamingTimeout = 10.seconds + val streamingTimeout = 60.seconds /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 48aa9e599504..6b664b7a7dfd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -122,7 +122,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { } // Verify that stopping actually stops the thread - failAfter(100.milliseconds) { + failAfter(1.second) { receiver.stop("test") assert(receiver.isStopped) assert(!receiver.otherThread.isAlive) From 857657d6f3cca8610928be892a6e1d3e6bad7111 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 22 Apr 2019 08:53:59 -0700 Subject: [PATCH 2/4] more --- .../ExecutorAllocationManagerSuite.scala | 18 +++++--- .../org/apache/spark/SparkFunSuite.scala | 46 ++++++++++++++++++- .../org/apache/spark/StatusTrackerSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 16 ++++--- .../SparkListenerWithClusterSuite.scala | 10 ++-- .../StreamingQueryManagerSuite.scala | 18 +++++--- 6 files changed, 82 insertions(+), 28 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 12c8a9d6c9c4..9c30124dd472 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{mock, never, verify, when} -import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.scalatest.PrivateMethodTester import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config @@ -38,20 +38,24 @@ import org.apache.spark.util.ManualClock */ class ExecutorAllocationManagerSuite extends SparkFunSuite - with LocalSparkContext - with BeforeAndAfter { + with LocalSparkContext { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ private val contexts = new mutable.ListBuffer[SparkContext]() - before { + override def beforeEach(): Unit = { + super.beforeEach() contexts.clear() } - after { - contexts.foreach(_.stop()) + override def afterEach(): Unit = { + try { + contexts.foreach(_.stop()) + } finally { + super.afterEach() + } } private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = { @@ -282,7 +286,7 @@ class ExecutorAllocationManagerSuite assert(totalRunningTasks(manager) === 0) } - test("cancel pending executors when no longer needed") { + testRetry("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 34e386819667..e274514f3bff 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -20,8 +20,10 @@ package org.apache.spark // scalastyle:off import java.io.File +import scala.annotation.tailrec + import org.apache.log4j.{Appender, Level, Logger} -import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Tests.IS_TESTING @@ -54,6 +56,7 @@ import org.apache.spark.util.{AccumulatorContext, Utils} abstract class SparkFunSuite extends FunSuite with BeforeAndAfterAll + with BeforeAndAfterEach with ThreadAudit with Logging { // scalastyle:on @@ -89,6 +92,47 @@ abstract class SparkFunSuite getTestResourceFile(file).getCanonicalPath } + /** + * Note: this method doesn't support [[BeforeAndAfter]]. You must use [[BeforeAndAfterEach]] to + * set up and tear down resources. + */ + def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = { + test(s) { + retry(n) { + body + } + } + } + + /** + * Note: this method doesn't support [[BeforeAndAfter]]. You must use [[BeforeAndAfterEach]] to + * set up and tear down resources. + */ + def retry[T](n: Int)(body: => T): T = { + if (this.isInstanceOf[BeforeAndAfter]) { + throw new UnsupportedOperationException( + s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " + + s"Please use ${classOf[BeforeAndAfterEach]} instead.") + } + retry0(n, n)(body) + } + + @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = { + try body + catch { case e: Throwable => + if (n > 0) { + logWarning(e.getMessage, e) + logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n") + // Reset state before re-attempting in order so that tests which use patterns like + // LocalSparkContext to clean up state can work correctly when retried. + afterEach() + beforeEach() + retry0(n-1, n0)(body) + } + else throw e + } + } + /** * Log the suite name and the test name before and after each test. * diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index c96db4e63941..f527bbe71852 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.JobExecutionStatus._ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkContext { - test("basic status API usage") { + testRetry("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() val jobId: Int = eventually(timeout(10.seconds)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index fdc1511c2ee3..86575b196922 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -34,7 +34,6 @@ import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ import org.mockito.ArgumentMatchers.{any, argThat} import org.mockito.Mockito.{doThrow, mock, spy, verify, when} -import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -52,16 +51,21 @@ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} import org.apache.spark.util.logging.DriverLogger -class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { +class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private var testDir: File = null - before { + override def beforeEach(): Unit = { + super.beforeEach() testDir = Utils.createTempDir(namePrefix = s"a b%20c+d") } - after { - Utils.deleteRecursively(testDir) + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(testDir) + } finally { + super.afterEach() + } } /** Create a fake log file using the new log format used in Spark 1.3+ */ @@ -741,7 +745,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("provider reports error after FS leaves safe mode") { + testRetry("provider reports error after FS leaves safe mode") { testDir.delete() val clock = new ManualClock() val provider = new SafeModeTestProvider(createTestConf(), clock) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 123f7f49d21b..a6576e0d1c52 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -19,25 +19,23 @@ package org.apache.spark.scheduler import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Unit tests for SparkListener that require a local cluster. */ -class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfter with BeforeAndAfterAll { +class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - before { + override def beforeEach(): Unit = { + super.beforeEach() sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite") } - test("SparkListener sends executor added message") { + testRetry("SparkListener sends executor added message") { val listener = new SaveExecutorInfo sc.addSparkListener(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index b421809f62d8..b26d2556b2e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -23,7 +23,6 @@ import scala.concurrent.Future import scala.util.Random import scala.util.control.NonFatal -import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ @@ -35,21 +34,26 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils -class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { +class StreamingQueryManagerSuite extends StreamTest { import AwaitTerminationTester._ import testImplicits._ override val streamingTimeout = 20.seconds - before { + override def beforeEach(): Unit = { + super.beforeEach() assert(spark.streams.active.isEmpty) spark.streams.resetTerminated() } - after { - assert(spark.streams.active.isEmpty) - spark.streams.resetTerminated() + override def afterEach(): Unit = { + try { + assert(spark.streams.active.isEmpty) + spark.streams.resetTerminated() + } finally { + super.afterEach() + } } testQuietly("listing") { @@ -83,7 +87,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("awaitAnyTermination without timeout and resetTerminated") { + testRetry("awaitAnyTermination without timeout and resetTerminated") { val datasets = Seq.fill(5)(makeDataset._2) withQueriesOn(datasets: _*) { queries => require(queries.size === datasets.size) From 322496656d6e727d9e55c181f0b2769c3cce67ce Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 22 Apr 2019 08:56:09 -0700 Subject: [PATCH 3/4] address comment. --- .../scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index a594a8791448..c5142fa550bc 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -205,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(180.seconds), interval(100.milliseconds)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -213,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(180.seconds), interval(100.milliseconds)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { From e5a5991529f9af659e734e5ed684f12f80b5e5bb Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 24 Apr 2019 15:29:13 +0900 Subject: [PATCH 4/4] Fix Javadoc generation issue in PR 24434 (#5) --- core/src/test/scala/org/apache/spark/SparkFunSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index e274514f3bff..9dd113262653 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -93,7 +93,7 @@ abstract class SparkFunSuite } /** - * Note: this method doesn't support [[BeforeAndAfter]]. You must use [[BeforeAndAfterEach]] to + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to * set up and tear down resources. */ def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = { @@ -105,7 +105,7 @@ abstract class SparkFunSuite } /** - * Note: this method doesn't support [[BeforeAndAfter]]. You must use [[BeforeAndAfterEach]] to + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to * set up and tear down resources. */ def retry[T](n: Int)(body: => T): T = {