From 0608b9f4f05aa58bf264f6eb90c690b754c63814 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 27 Jul 2016 15:51:04 -0700 Subject: [PATCH 01/11] Add postix ops import --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index cba6aa53f17e..9cf1ef62691a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1774,6 +1774,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } def testCommandAvailable(command: String): Boolean = { + import scala.language.postfixOps Try(Process(command) !!).isSuccess } } From bb82dce2b828da1e8aa99f8e6b1cd33c3ed7f05c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 27 Jul 2016 15:57:38 -0700 Subject: [PATCH 02/11] Wait we can rewrite this to just avoid postfix and also avoid slurping the result as a string we throw away --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9cf1ef62691a..f6df4b9bc97d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import java.sql.{Date, Timestamp} import scala.sys.process.Process -import scala.util.Try import org.apache.hadoop.fs.Path @@ -1774,7 +1773,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } def testCommandAvailable(command: String): Boolean = { - import scala.language.postfixOps - Try(Process(command) !!).isSuccess + Process(command).run().exitValue() == 0 } } From 1decad45510d48344f98fea85eb07b9498f77c48 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 13:47:55 -0700 Subject: [PATCH 03/11] Remove postfix \!\! on process --- core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index f8d523fa2c6a..0f2d4363d2e0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -21,9 +21,7 @@ import java.io.File import scala.collection.Map import scala.io.Codec -import scala.language.postfixOps import scala.sys.process._ -import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{LongWritable, Text} @@ -215,7 +213,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } def testCommandAvailable(command: String): Boolean = { - Try(Process(command) !!).isSuccess + Process(command).run().exitValue() == 0 } def testExportInputFile(varName: String) { From b264f2d4eeb3eadb1e916fad5d2d8ab4d9ff01eb Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 15:01:08 -0700 Subject: [PATCH 04/11] Remove postfixOps from most places - still left in the XML parsing places cause it seems to probably improve readability there --- .../spark/deploy/FaultToleranceTest.scala | 17 +++--- .../apache/spark/deploy/SparkHadoopUtil.scala | 1 - .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../spark/scheduler/TaskSchedulerImpl.scala | 1 - .../org/apache/spark/util/RpcUtils.scala | 2 - .../apache/spark/HeartbeatReceiverSuite.scala | 1 - .../org/apache/spark/SparkConfSuite.scala | 5 +- .../org/apache/spark/StatusTrackerSuite.scala | 35 ++++++------ .../history/ApplicationCacheSuite.scala | 1 - .../history/FsHistoryProviderSuite.scala | 5 +- .../deploy/history/HistoryServerSuite.scala | 7 ++- .../spark/deploy/master/MasterSuite.scala | 3 +- .../spark/launcher/LauncherBackendSuite.scala | 5 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 53 +++++++++---------- .../OutputCommitCoordinatorSuite.scala | 3 +- .../scheduler/TaskResultGetterSuite.scala | 4 +- .../BlockManagerReplicationSuite.scala | 1 - .../spark/storage/BlockManagerSuite.scala | 21 ++++---- .../spark/storage/MemoryStoreSuite.scala | 1 - .../apache/spark/util/EventLoopSuite.scala | 19 ++++--- .../flume/FlumePollingStreamSuite.scala | 3 +- .../streaming/flume/FlumeStreamSuite.scala | 5 +- .../streaming/kafka010/KafkaTestUtils.scala | 2 - .../kafka010/DirectKafkaStreamSuite.scala | 17 +++--- .../streaming/kafka/KafkaTestUtils.scala | 2 - .../kafka/DirectKafkaStreamSuite.scala | 13 +++-- .../streaming/kafka/KafkaStreamSuite.scala | 3 +- .../kafka/ReliableKafkaStreamSuite.scala | 5 +- .../kinesis/KinesisCheckpointerSuite.scala | 1 - .../kinesis/KinesisStreamSuite.scala | 1 - .../apache/spark/graphx/lib/PageRank.scala | 3 +- .../spark/mllib/util/MFDataGenerator.scala | 1 - .../spark/repl/ExecutorClassLoaderSuite.scala | 1 - .../apache/spark/sql/types/DecimalSuite.scala | 2 - .../apache/spark/sql/CachedTableSuite.scala | 9 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 1 - .../spark/sql/DatasetAggregatorSuite.scala | 2 - .../apache/spark/sql/DatasetCacheSuite.scala | 2 - .../spark/sql/DatasetPrimitiveSuite.scala | 2 - .../org/apache/spark/sql/DatasetSuite.scala | 2 - .../util/FileBasedWriteAheadLog.scala | 3 +- .../spark/streaming/InputStreamsSuite.scala | 1 - .../streaming/ReceivedBlockHandlerSuite.scala | 4 +- .../scheduler/JobGeneratorSuite.scala | 3 +- .../yarn/AMDelegationTokenRenewer.scala | 2 - .../deploy/yarn/BaseYarnClusterSuite.scala | 1 - .../spark/deploy/yarn/YarnClusterSuite.scala | 1 - .../yarn/YarnShuffleServiceSuite.scala | 1 - 48 files changed, 111 insertions(+), 170 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 79f4d06c8460..ff8b10c7cba3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.language.postfixOps import scala.sys.process._ import org.json4s._ @@ -113,7 +112,7 @@ private object FaultToleranceTest extends App with Logging { assertValidClusterState() killLeader() - delay(30 seconds) + delay(30.seconds) assertValidClusterState() createClient() assertValidClusterState() @@ -127,12 +126,12 @@ private object FaultToleranceTest extends App with Logging { killLeader() addMasters(1) - delay(30 seconds) + delay(30.seconds) assertValidClusterState() killLeader() addMasters(1) - delay(30 seconds) + delay(30.seconds) assertValidClusterState() } @@ -157,7 +156,7 @@ private object FaultToleranceTest extends App with Logging { killLeader() workers.foreach(_.kill()) workers.clear() - delay(30 seconds) + delay(30.seconds) addWorkers(2) assertValidClusterState() } @@ -175,7 +174,7 @@ private object FaultToleranceTest extends App with Logging { (1 to 3).foreach { _ => killLeader() - delay(30 seconds) + delay(30.seconds) assertValidClusterState() assertTrue(getLeader == masters.head) addMasters(1) @@ -265,7 +264,7 @@ private object FaultToleranceTest extends App with Logging { } // Avoid waiting indefinitely (e.g., we could register but get no executors). - assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120.seconds)) } /** @@ -318,7 +317,7 @@ private object FaultToleranceTest extends App with Logging { } try { - assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120.seconds)) } catch { case e: TimeoutException => logError("Master states: " + masters.map(_.state)) @@ -422,7 +421,7 @@ private object SparkDocker { } dockerCmd.run(ProcessLogger(findIpAndLog _)) - val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds) + val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds) val dockerId = Docker.getLastProcessId (ip, dockerId, outFile) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 90c71cc6cfab..671e8e4484f6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -25,7 +25,6 @@ import java.util.{Arrays, Comparator, Date} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.control.NonFatal import com.google.common.primitives.Longs diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4eb7c81f9e8c..f684bedb040b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -27,7 +27,6 @@ import scala.collection.Map import scala.collection.mutable.{HashMap, HashSet, Stack} import scala.concurrent.duration._ import scala.language.existentials -import scala.language.postfixOps import scala.util.control.NonFatal import org.apache.commons.lang3.SerializationUtils @@ -233,7 +232,7 @@ class DAGScheduler( blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) blockManagerMaster.driverEndpoint.askWithRetry[Boolean]( - BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) + BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600.seconds, "BlockManagerHeartbeat")) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2ce49ca1345f..dc05e764c395 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import scala.language.postfixOps import scala.util.Random import org.apache.spark._ diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 2bb8de568e80..e3b588374ce1 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,8 +17,6 @@ package org.apache.spark.util -import scala.language.postfixOps - import org.apache.spark.SparkConf import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 5e2ba311ee77..5f59c176ab78 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -22,7 +22,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit} import scala.collection.Map import scala.collection.mutable import scala.concurrent.duration._ -import scala.language.postfixOps import org.mockito.Matchers import org.mockito.Matchers._ diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index a883d1b57e52..1b17894e143e 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo @@ -262,10 +261,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(RpcUtils.retryWaitMs(conf) === 2L) conf.set("spark.akka.askTimeout", "3") - assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds)) + assert(RpcUtils.askRpcTimeout(conf).duration === (3.seconds)) conf.set("spark.akka.lookupTimeout", "4") - assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds)) + assert(RpcUtils.lookupRpcTimeout(conf).duration === (4.seconds)) } test("SPARK-13727") { diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index 5483f2b8434a..d0d2531ac71f 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark import scala.concurrent.duration._ import scala.language.implicitConversions -import scala.language.postfixOps import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -31,25 +30,25 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont test("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() - val jobId: Int = eventually(timeout(10 seconds)) { + val jobId: Int = eventually(timeout(10.seconds)) { val jobIds = jobFuture.jobIds jobIds.size should be(1) jobIds.head } - val jobInfo = eventually(timeout(10 seconds)) { + val jobInfo = eventually(timeout(10.seconds)) { sc.statusTracker.getJobInfo(jobId).get } jobInfo.status() should not be FAILED val stageIds = jobInfo.stageIds() stageIds.size should be(2) - val firstStageInfo = eventually(timeout(10 seconds)) { + val firstStageInfo = eventually(timeout(10.seconds)) { sc.statusTracker.getStageInfo(stageIds(0)).get } firstStageInfo.stageId() should be(stageIds(0)) firstStageInfo.currentAttemptId() should be(0) firstStageInfo.numTasks() should be(2) - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds(0)).get updatedFirstStageInfo.numCompletedTasks() should be(2) updatedFirstStageInfo.numActiveTasks() should be(0) @@ -61,29 +60,29 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc = new SparkContext("local", "test", new SparkConf(false)) // Passing `null` should return jobs that were not run in a job group: val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync() - val defaultJobGroupJobId = eventually(timeout(10 seconds)) { + val defaultJobGroupJobId = eventually(timeout(10.seconds)) { defaultJobGroupFuture.jobIds.head } - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId)) } // Test jobs submitted in job groups: sc.setJobGroup("my-job-group", "description") sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty) val firstJobFuture = sc.parallelize(1 to 1000).countAsync() - val firstJobId = eventually(timeout(10 seconds)) { + val firstJobId = eventually(timeout(10.seconds)) { firstJobFuture.jobIds.head } - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) } - val secondJobFuture = sc.parallelize(1 to 1000).countAsync() - val secondJobId = eventually(timeout(10 seconds)) { - secondJobFuture.jobIds.head + val.secondJobFuture = sc.parallelize(1 to 1000).countAsync() + val.secondJobId = eventually(timeout(10.seconds)) { + .secondJobFuture.jobIds.head } - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be ( - Set(firstJobId, secondJobId)) + Set(firstJobId,.secondJobId)) } } @@ -92,10 +91,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc.setJobGroup("my-job-group2", "description") sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) - val firstJobId = eventually(timeout(10 seconds)) { + val firstJobId = eventually(timeout(10.seconds)) { firstJobFuture.jobIds.head } - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) } } @@ -105,10 +104,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc.setJobGroup("my-job-group2", "description") sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999) - val firstJobId = eventually(timeout(10 seconds)) { + val firstJobId = eventually(timeout(10.seconds)) { firstJobFuture.jobIds.head } - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2 } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 4ab000b53ad1..e3304be792af 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -23,7 +23,6 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.mutable import scala.collection.mutable.ListBuffer -import scala.language.postfixOps import com.codahale.metrics.Counter import com.google.common.cache.LoadingCache diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 39c5857b1345..d1902dc9f896 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ -import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.hdfs.DistributedFileSystem @@ -368,7 +367,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1 second), interval(10 millis)) { + eventually(timeout(1.second), interval(10.millis)) { provider.getConfig().keys should not contain ("HDFS State") } } finally { @@ -386,7 +385,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1 second), interval(10 millis)) { + eventually(timeout(1.second), interval(10.millis)) { verify(errorHandler).uncaughtException(any(), any()) } } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 631a7cd9d5d7..dd39cb1c307b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -23,7 +23,6 @@ import java.util.zip.ZipInputStream import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.concurrent.duration._ -import scala.language.postfixOps import com.codahale.metrics.Counter import com.google.common.io.{ByteStreams, Files} @@ -349,8 +348,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // start initial job val d = sc.parallelize(1 to 10) d.count() - val stdInterval = interval(100 milliseconds) - val appId = eventually(timeout(20 seconds), stdInterval) { + val stdInterval = interval(100.milliseconds) + val appId = eventually(timeout(20.seconds), stdInterval) { val json = getContentAndCode("applications", port)._2.get val apps = parse(json).asInstanceOf[JArray].arr apps should have size 1 @@ -430,7 +429,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers d2.count() dumpLogDir("After second job") - val stdTimeout = timeout(10 seconds) + val stdTimeout = timeout(10.seconds) logDebug("waiting for UI to update") eventually(stdTimeout, stdInterval) { assert(2 === getNumJobs(""), diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 7cbe4e342eaa..3f1add309b8e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.io.Source -import scala.language.postfixOps import org.json4s._ import org.json4s.jackson.JsonMethods._ @@ -140,7 +139,7 @@ class MasterSuite extends SparkFunSuite val localCluster = new LocalSparkCluster(2, 2, 512, conf) localCluster.start() try { - eventually(timeout(5 seconds), interval(100 milliseconds)) { + eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json") .getLines().mkString("\n") val JArray(workers) = (parse(json) \ "workers") diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala index cac15a1dc441..5d130baffddd 100644 --- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.launcher import java.util.concurrent.TimeUnit import scala.concurrent.duration._ -import scala.language.postfixOps import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -53,13 +52,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers { .startApplication() try { - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(30.seconds), interval(100.millis)) { handle.getAppId() should not be (null) } handle.stop() - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(30.seconds), interval(100.millis)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index acdf21df9a16..d38e3bf2306b 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -26,7 +26,6 @@ import scala.collection.mutable import scala.collection.JavaConverters._ import scala.concurrent.Await import scala.concurrent.duration._ -import scala.language.postfixOps import com.google.common.io.Files import org.mockito.Matchers.any @@ -78,7 +77,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) rpcEndpointRef.send("hello") - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { assert("hello" === message) } } @@ -99,7 +98,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely") try { rpcEndpointRef.send("hello") - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { assert("hello" === message) } } finally { @@ -179,7 +178,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause val e = intercept[SparkException] { - rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp)) + rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1.millis, shortProp)) } // The SparkException cause should be a RpcTimeoutException with message indicating the // controlling timeout property @@ -235,7 +234,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { assert(e.getMessage === "Oops!") } } @@ -260,7 +259,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(endpointRef) - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { assert(e.getMessage === "Oops!") } } @@ -281,7 +280,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { endpointRef.send("Foo") - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { assert(e.getMessage === "Oops!") } } @@ -302,7 +301,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { // Calling `self` in `onStart` is fine assert(callSelfSuccessfully === true) } @@ -323,7 +322,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { endpointRef.send("Foo") - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { // Calling `self` in `receive` is fine assert(callSelfSuccessfully === true) } @@ -346,7 +345,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(endpointRef) - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { // Calling `self` in `onStop` will return null, so selfOption will be None assert(selfOption == None) } @@ -375,7 +374,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }.start() } - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(result == 1000) } @@ -400,7 +399,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(endpointRef) env.stop(endpointRef) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { // Calling stop twice should only trigger onStop once. assert(onStopCount == 1) } @@ -416,7 +415,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }) val f = endpointRef.ask[String]("Hi") - val ack = ThreadUtils.awaitResult(f, 5 seconds) + val ack = ThreadUtils.awaitResult(f, 5.seconds) assert("ack" === ack) env.stop(endpointRef) @@ -436,7 +435,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely") try { val f = rpcEndpointRef.ask[String]("hello") - val ack = ThreadUtils.awaitResult(f, 5 seconds) + val ack = ThreadUtils.awaitResult(f, 5.seconds) assert("ack" === ack) } finally { anotherEnv.shutdown() @@ -455,7 +454,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val f = endpointRef.ask[String]("Hi") val e = intercept[SparkException] { - ThreadUtils.awaitResult(f, 5 seconds) + ThreadUtils.awaitResult(f, 5.seconds) } assert("Oops" === e.getCause.getMessage) @@ -477,7 +476,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { val f = rpcEndpointRef.ask[String]("hello") val e = intercept[SparkException] { - ThreadUtils.awaitResult(f, 5 seconds) + ThreadUtils.awaitResult(f, 5.seconds) } assert("Oops" === e.getCause.getMessage) } finally { @@ -529,14 +528,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Send a message to set up the connection serverRefInServer2.send("hello") - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(events.contains(("onConnected", serverEnv2.address))) } serverEnv2.shutdown() serverEnv2.awaitTermination() - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(events.contains(("onConnected", serverEnv2.address))) assert(events.contains(("onDisconnected", serverEnv2.address))) } @@ -557,7 +556,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Send a message to set up the connection serverRefInClient.send("hello") - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { // We don't know the exact client address but at least we can verify the message type assert(events.asScala.map(_._1).exists(_ == "onConnected")) } @@ -565,7 +564,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { clientEnv.shutdown() clientEnv.awaitTermination() - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { // We don't know the exact client address but at least we can verify the message type assert(events.asScala.map(_._1).exists(_ == "onConnected")) assert(events.asScala.map(_._1).exists(_ == "onDisconnected")) @@ -588,14 +587,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Send a message to set up the connection serverRefInClient.send("hello") - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(events.contains(("onConnected", serverEnv.address))) } serverEnv.shutdown() serverEnv.awaitTermination() - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(events.contains(("onConnected", serverEnv.address))) assert(events.contains(("onDisconnected", serverEnv.address))) } @@ -623,7 +622,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { val f = rpcEndpointRef.ask[String]("hello") val e = intercept[SparkException] { - ThreadUtils.awaitResult(f, 1 seconds) + ThreadUtils.awaitResult(f, 1.seconds) } assert(e.getCause.isInstanceOf[NotSerializableException]) } finally { @@ -656,7 +655,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }) val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "send-authentication") rpcEndpointRef.send("hello") - eventually(timeout(5 seconds), interval(10 millis)) { + eventually(timeout(5.seconds), interval(10.millis)) { assert("hello" === message) } } finally { @@ -738,8 +737,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout") - val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout") + val longTimeout = new RpcTimeout(1.second, "spark.rpc.long.timeout") + val shortTimeout = new RpcTimeout(10.millis, "spark.rpc.short.timeout") // Ask with immediate response, should complete successfully val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout) @@ -764,7 +763,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // once the future is complete to verify addMessageIfTimeout was invoked val reply3 = intercept[RpcTimeoutException] { - Await.result(fut3, 2000 millis) + Await.result(fut3, 2000.millis) }.getMessage // scalastyle:on awaitresult diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 83288db92bb4..f0477d4615f2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -21,7 +21,6 @@ import java.io.File import java.util.concurrent.TimeoutException import scala.concurrent.duration._ -import scala.language.postfixOps import org.apache.hadoop.mapred.{JobConf, OutputCommitter, TaskAttemptContext, TaskAttemptID} import org.mockito.Matchers @@ -159,7 +158,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. val e = intercept[SparkException] { - ThreadUtils.awaitResult(futureAction, 5 seconds) + ThreadUtils.awaitResult(futureAction, 5.seconds) } assert(e.getCause.isInstanceOf[TimeoutException]) assert(tempDir.list().size === 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 9e472f900b65..d4b46e28f29b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -23,7 +23,6 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.control.NonFatal import com.google.common.util.concurrent.MoreExecutors @@ -61,7 +60,7 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task sparkEnv.blockManager.master.removeBlock(blockId) // removeBlock is asynchronous. Need to wait it's removed successfully try { - eventually(timeout(3 seconds), interval(200 milliseconds)) { + eventually(timeout(3.seconds), interval(200.milliseconds)) { assert(!sparkEnv.blockManager.master.contains(blockId)) } removeBlockSuccessfully = true @@ -248,4 +247,3 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local } } - diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 31687e614731..2615a09870b7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.storage import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions -import scala.language.postfixOps import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, Matchers} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8077a1b9414e..7b5381c5c72a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.Future import scala.language.implicitConversions -import scala.language.postfixOps import scala.reflect.ClassTag import org.mockito.{Matchers => mc} @@ -255,19 +254,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeBlock("a2-to-remove") master.removeBlock("a3-to-remove") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(!store.hasLocalBlock("a1-to-remove")) master.getLocations("a1-to-remove") should have size 0 } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(!store.hasLocalBlock("a2-to-remove")) master.getLocations("a2-to-remove") should have size 0 } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(store.hasLocalBlock("a3-to-remove")) master.getLocations("a3-to-remove") should have size 0 } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { val memStatus = master.getMemoryStatus.head._2 memStatus._1 should equal (40000L) memStatus._2 should equal (40000L) @@ -285,15 +284,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = false) - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) master.getLocations(rdd(0, 0)) should have size 0 } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { store.getSingleAndReleaseLock(rdd(0, 1)) should be (None) master.getLocations(rdd(0, 1)) should have size 0 } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { store.getSingleAndReleaseLock("nonrddblock") should not be (None) master.getLocations("nonrddblock") should have size (1) } @@ -358,7 +357,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // remove broadcast 1 block from both the stores asynchronously // and verify all broadcast 1 blocks have been removed master.removeBroadcast(1, removeFromMaster = true, blocking = false) - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(!driverStore.hasLocalBlock(broadcast1BlockId)) assert(!executorStore.hasLocalBlock(broadcast1BlockId)) } @@ -366,7 +365,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // remove broadcast 2 from both the stores asynchronously // and verify all broadcast 2 blocks have been removed master.removeBroadcast(2, removeFromMaster = true, blocking = false) - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(!driverStore.hasLocalBlock(broadcast2BlockId)) assert(!driverStore.hasLocalBlock(broadcast2BlockId2)) assert(!executorStore.hasLocalBlock(broadcast2BlockId)) @@ -870,7 +869,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } // Make sure get a1 doesn't hang and returns None. - failAfter(1 second) { + failAfter(1.second) { assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store") } } diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 145d432afe85..c11de826677e 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.storage import java.nio.ByteBuffer import scala.language.implicitConversions -import scala.language.postfixOps import scala.language.reflectiveCalls import scala.reflect.ClassTag diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index 6f7dddd4f760..de6f436e3a58 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts @@ -42,7 +41,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() (1 to 100).foreach(eventLoop.post) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert((1 to 100) === buffer.asScala.toSeq) } eventLoop.stop() @@ -77,7 +76,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(e === receivedError) } eventLoop.stop() @@ -99,7 +98,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(e === receivedError) assert(eventLoop.isActive) } @@ -154,7 +153,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { }.start() } - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(threadNum * eventsFromEachThread === receivedEventsCount) } eventLoop.stop() @@ -179,7 +178,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - failAfter(5 seconds) { + failAfter(5.seconds) { // Wait until we enter `onReceive` onReceiveLatch.await() eventLoop.stop() @@ -200,7 +199,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(!eventLoop.isActive) } } @@ -224,7 +223,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } } eventLoop.start() - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(!eventLoop.isActive) } assert(onStopCalled) @@ -247,7 +246,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(!eventLoop.isActive) } assert(onStopCalled) @@ -271,7 +270,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(5 millis)) { + eventually(timeout(5.seconds), interval(5.millis)) { assert(!eventLoop.isActive) } assert(onStopCalled) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 1c93079497f6..2adfa9d86c26 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ @@ -127,7 +126,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with clock.advance(batchDuration.milliseconds) // The eventually is required to ensure that all data in the batch has been processed. - eventually(timeout(10 seconds), interval(100 milliseconds)) { + eventually(timeout(10.seconds), interval(100.milliseconds)) { val flattenOutput = outputQueue.asScala.toSeq.flatten val headers = flattenOutput.map(_.event.getHeaders.asScala.map { case (key, value) => (key.toString, value.toString) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 7bac1cc4b0ae..c0b8dd09899d 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import org.jboss.netty.channel.ChannelPipeline import org.jboss.netty.channel.socket.SocketChannel @@ -55,11 +54,11 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w try { val outputQueue = startContext(utils.getTestPort(), testCompression) - eventually(timeout(10 seconds), interval(100 milliseconds)) { + eventually(timeout(10.seconds), interval(100.milliseconds)) { utils.writeInput(input.asJava, testCompression) } - eventually(timeout(10 seconds), interval(100 milliseconds)) { + eventually(timeout(10.seconds), interval(100.milliseconds)) { val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } outputEvents.foreach { event => diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 19192e4b9594..cb9db232b59e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.language.postfixOps import scala.util.control.NonFatal import kafka.admin.AdminUtils @@ -275,4 +274,3 @@ private[kafka010] class KafkaTestUtils extends Logging { } } } - diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index b1d90b8a82d5..863fc60bafa0 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.Random import org.apache.kafka.clients.consumer._ @@ -238,7 +237,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10 seconds), interval(20 milliseconds)) { + eventually(timeout(10.seconds), interval(20.milliseconds)) { assert(getLatestOffset() > 3) } val offsetBeforeStart = getLatestOffset() @@ -266,7 +265,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -288,7 +287,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10 seconds), interval(20 milliseconds)) { + eventually(timeout(10.seconds), interval(20.milliseconds)) { assert(getLatestOffset() >= 10) } val offsetBeforeStart = getLatestOffset() @@ -318,7 +317,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -366,7 +365,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -405,7 +404,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() @@ -428,7 +427,7 @@ class DirectKafkaStreamSuite def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { assert(strings.forall { collectedData.contains }) } } @@ -595,7 +594,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(10 milliseconds)) { + eventually(timeout(5.seconds), interval(10.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index abfd7aad4c5c..03c9ca7524e5 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.language.postfixOps import scala.util.control.NonFatal import kafka.admin.AdminUtils @@ -274,4 +273,3 @@ private[kafka] class KafkaTestUtils extends Logging { } } } - diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index ab1c5055a253..aa695a4cc7c4 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata @@ -158,7 +157,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10 seconds), interval(20 milliseconds)) { + eventually(timeout(10.seconds), interval(20.milliseconds)) { assert(getLatestOffset() > 3) } val offsetBeforeStart = getLatestOffset() @@ -180,7 +179,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -203,7 +202,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10 seconds), interval(20 milliseconds)) { + eventually(timeout(10.seconds), interval(20.milliseconds)) { assert(getLatestOffset() >= 10) } val offsetBeforeStart = getLatestOffset() @@ -226,7 +225,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -274,7 +273,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -317,7 +316,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(10.seconds), interval(50.milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 6a35ac14a8f6..0dfedadbaf0e 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming.kafka import scala.collection.mutable import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder @@ -77,7 +76,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc.start() - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + eventually(timeout(10000.milliseconds), interval(100.milliseconds)) { assert(result.synchronized { sent === result }) } } diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 7b9aee39ffb7..c72e582e2bcb 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -21,7 +21,6 @@ import java.io.File import scala.collection.mutable import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder @@ -105,7 +104,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite } ssc.start() - eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { + eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { // A basic process verification for ReliableKafkaReceiver. // Verify whether received message number is equal to the sent message number. assert(data.size === result.size) @@ -132,7 +131,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite stream.foreachRDD(_ => Unit) ssc.start() - eventually(timeout(20000 milliseconds), interval(100 milliseconds)) { + eventually(timeout(20000.milliseconds), interval(100.milliseconds)) { // Verify the offset for each group/topic to see whether they are equal to the expected one. topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala index e1499a822099..5580e1f598b6 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ExecutorService, TimeoutException} import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ -import scala.language.postfixOps import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer import org.mockito.Matchers._ diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 0e71bf9b8433..2111c23889a9 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming.kinesis import scala.collection.mutable import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.Random import com.amazonaws.regions.RegionUtils diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 0a1622bca0f4..2f5bd4ed4ff6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -17,7 +17,6 @@ package org.apache.spark.graphx.lib -import scala.language.postfixOps import scala.reflect.ClassTag import org.apache.spark.graphx._ @@ -109,7 +108,7 @@ object PageRank extends Logging { require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + s" to [0, 1], but got ${resetProb}") - val personalized = srcId isDefined + val personalized = srcId.isDefined val src: VertexId = srcId.getOrElse(-1L) // Initialize the PageRank graph with each edge attribute having diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala index 898a09e51636..42c5bcdd39f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala @@ -19,7 +19,6 @@ package org.apache.spark.mllib.util import java.{util => ju} -import scala.language.postfixOps import scala.util.Random import org.apache.spark.SparkContext diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala index 12e98565dcef..3d622d42f408 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala @@ -27,7 +27,6 @@ import java.util import scala.concurrent.duration._ import scala.io.Source import scala.language.implicitConversions -import scala.language.postfixOps import com.google.common.io.Files import org.mockito.Matchers.anyString diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala index e1675c95907a..a10c0e39eb68 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.types -import scala.language.postfixOps - import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index f42402e1cc7d..d42ea71f1aa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import scala.collection.mutable.HashSet import scala.concurrent.duration._ -import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ @@ -237,7 +236,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext "Eagerly cached in-memory table should have already been materialized") spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -254,7 +253,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext "Eagerly cached in-memory table should have already been materialized") spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -275,7 +274,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext "Lazily cached in-memory table should have been materialized") spark.catalog.uncacheTable("testData") - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -365,7 +364,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext System.gc() - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty }, "batchStats accumulators should be cleared after GC when uncacheTable") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 905da554f1cf..f0bb940cd397 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -21,7 +21,6 @@ import java.io.File import java.nio.charset.StandardCharsets import java.util.UUID -import scala.language.postfixOps import scala.util.Random import org.scalatest.Matchers._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index ddc4dcd2395b..b117fbd0bcf9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import scala.language.postfixOps - import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scalalang.typed diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index ac9f6c2f3853..8d5e9645df89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import scala.language.postfixOps - import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 6aa3d3fe808b..f8d4c61967f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import scala.language.postfixOps - import org.apache.spark.sql.test.SharedSQLContext case class IntClass(value: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7e3b7b63d8b1..27d7ec14da6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} -import scala.language.postfixOps - import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.streaming.MemoryStream diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9b689f01b8d3..ad4ff7ca4e36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.ExecutionContextTaskSupport import scala.concurrent.{Await, ExecutionContext, Future} -import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -188,7 +187,7 @@ private[streaming] class FileBasedWriteAheadLog( val f = Future { deleteFile(logInfo) }(executionContext) if (waitForCompletion) { import scala.concurrent.duration._ - Await.ready(f, 1 second) + Await.ready(f, 1.second) } } catch { case e: RejectedExecutionException => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 00d506c2f18b..9ecfa48091a0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.language.postfixOps import com.google.common.io.Files import org.apache.hadoop.fs.Path diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index e97427991bf9..dc8e6a742904 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, Matchers} @@ -186,7 +185,7 @@ class ReceivedBlockHandlerSuite val cleanupThreshTime = 3000L handler.cleanupOldBlocks(cleanupThreshTime) - eventually(timeout(10000 millis), interval(10 millis)) { + eventually(timeout(10000.millis), interval(10.millis)) { getWriteAheadLogFiles().size should be < preCleanupLogFiles.size } } @@ -413,4 +412,3 @@ class ReceivedBlockHandlerSuite private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) } - diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala index a2dbae149f31..3faecb821aff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler import java.util.concurrent.CountDownLatch import scala.concurrent.duration._ -import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ @@ -69,7 +68,7 @@ class JobGeneratorSuite extends TestSuiteBase { val longBatchNumber = 3 // 3rd batch will take a long time val longBatchTime = longBatchNumber * batchDuration.milliseconds - val testTimeout = timeout(10 seconds) + val testTimeout = timeout(10.seconds) val inputStream = ssc.receiverStream(new TestReceiver) inputStream.foreachRDD((rdd: RDD[Int], time: Time) => { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index a6a4fec3ba9e..9dd21efb9152 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.yarn import java.security.PrivilegedExceptionAction import java.util.concurrent.{Executors, TimeUnit} -import scala.language.postfixOps - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 9c3b18e4ec5f..01bd22d2331d 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import com.google.common.io.Files import org.apache.commons.lang3.SerializationUtils diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 1ccd7e5993f5..962f7fcdfbb3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -24,7 +24,6 @@ import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.concurrent.duration._ -import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index e123e7854104..1936bb72e220 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -23,7 +23,6 @@ import java.util.EnumSet import scala.annotation.tailrec import scala.concurrent.duration._ -import scala.language.postfixOps import org.apache.hadoop.fs.Path import org.apache.hadoop.service.ServiceStateException From bbf1bf230fdfd6fa48a3d9673794da9f2702140e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 15:34:31 -0700 Subject: [PATCH 05/11] Fix accidental changes to StatusTrackerSuite --- .../test/scala/org/apache/spark/StatusTrackerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index d0d2531ac71f..cd8503d0ede4 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -76,13 +76,13 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) } - val.secondJobFuture = sc.parallelize(1 to 1000).countAsync() - val.secondJobId = eventually(timeout(10.seconds)) { - .secondJobFuture.jobIds.head + val secondJobFuture = sc.parallelize(1 to 1000).countAsync() + val secondJobId = eventually(timeout(10.seconds)) { + secondJobFuture.jobIds.head } eventually(timeout(10.seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be ( - Set(firstJobId,.secondJobId)) + Set(firstJobId, secondJobId)) } } From c6383bfae30f8ab2717bf8166e042fa96b817dac Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 15:45:39 -0700 Subject: [PATCH 06/11] fix blockmanager too --- .../spark/storage/BlockManagerReplicationSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 945771587d6c..52ee8f0628b7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -292,7 +292,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite // Add another normal block manager and test that 2x replication works makeBlockManager(10000, "anotherStore") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(replicateAndGetNumCopies("a2") === 2) } } @@ -317,14 +317,14 @@ class BlockManagerReplicationSuite extends SparkFunSuite // Add another store, 3x replication should work now, 4x replication should only replicate 3x val newStore1 = makeBlockManager(storeSize, s"newstore1") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(replicateAndGetNumCopies("a3", 3) === 3) } assert(replicateAndGetNumCopies("a4", 4) === 3) // Add another store, 4x replication should work now val newStore2 = makeBlockManager(storeSize, s"newstore2") - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(replicateAndGetNumCopies("a5", 4) === 4) } @@ -340,7 +340,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } - eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { assert(replicateAndGetNumCopies("a7", 3) === 3) } } From 4dbd5fa4a69b378ac3fa6103176cf44dfa44bf86 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 15:53:41 -0700 Subject: [PATCH 07/11] Add missing . for seconds in cachedtable stuie --- .../src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d42ea71f1aa4..239f44ab3889 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -220,7 +220,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("UNCACHE TABLE testData") assert(!spark.catalog.isCached("testData"), "Table 'testData' should not be cached") - eventually(timeout(10 seconds)) { + eventually(timeout(10.seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } From 89ee6aeb56d059c11f77162fc84c1fcb34748798 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 18:10:17 -0700 Subject: [PATCH 08/11] Cleanup some more things that were not in the standard build --- .../streaming/kinesis/KinesisCheckpointerSuite.scala | 10 +++++----- .../spark/streaming/kinesis/KinesisStreamSuite.scala | 6 +++--- .../spark/deploy/yarn/AMDelegationTokenRenewer.scala | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala index 5580e1f598b6..c83c06ca7872 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala @@ -88,7 +88,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) clock.advance(5 * checkpointInterval.milliseconds) - eventually(timeout(1 second)) { + eventually(timeout(1.second)) { verify(checkpointerMock, times(1)).checkpoint(seqNum) verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) } @@ -109,7 +109,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) clock.advance(checkpointInterval.milliseconds * 5) - eventually(timeout(1 second)) { + eventually(timeout(1.second)) { verify(checkpointerMock, atMost(1)).checkpoint(anyString()) } } @@ -132,7 +132,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) clock.advance(checkpointInterval.milliseconds) - eventually(timeout(1 second)) { + eventually(timeout(1.second)) { verify(checkpointerMock, times(1)).checkpoint(anyString()) } // don't block test thread @@ -140,11 +140,11 @@ class KinesisCheckpointerSuite extends TestSuiteBase ExecutionContext.global) intercept[TimeoutException] { - Await.ready(f, 50 millis) + Await.ready(f, 50.millis) } clock.advance(checkpointInterval.milliseconds / 2) - eventually(timeout(1 second)) { + eventually(timeout(1.second)) { verify(checkpointerMock, times(2)).checkpoint(anyString()) } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 2111c23889a9..5da7c411df2e 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -187,7 +187,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.start() val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { + eventually(timeout(120.seconds), interval(10.second)) { testUtils.pushData(testData, aggregateTestData) assert(collected.synchronized { collected === testData.toSet }, "\nData received does not match data sent") @@ -215,7 +215,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.start() val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { + eventually(timeout(120.seconds), interval(10.second)) { testUtils.pushData(testData, aggregateTestData) val modData = testData.map(_ + 5) assert(collected.synchronized { collected === modData.toSet }, @@ -259,7 +259,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Run until there are at least 10 batches with some data in them // If this times out because numBatchesWithData is empty, then its likely that foreachRDD // function failed with exceptions, and nothing got added to `collectedData` - eventually(timeout(2 minutes), interval(1 seconds)) { + eventually(timeout(2.minutes), interval(1.seconds)) { testUtils.pushData(1 to 5, aggregateTestData) assert(isCheckpointPresent && numBatchesWithData > 10) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index 9dd21efb9152..310a7a6b05e7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -126,7 +126,7 @@ private[yarn] class AMDelegationTokenRenewer( try { val remoteFs = FileSystem.get(freshHadoopConf) val credentialsPath = new Path(credentialsFile) - val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis + val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis hadoopUtil.listFilesSorted( remoteFs, credentialsPath.getParent, credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) From 91aabd299ad14d06f8d6259f5c83fc60be87ec96 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 18:26:20 -0700 Subject: [PATCH 09/11] Fix some of the postfix operations in the Yarn suites left out of the default build --- .../org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 4 ++-- .../apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 01bd22d2331d..48f7546dd089 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -166,7 +166,7 @@ abstract class BaseYarnClusterSuite val handle = launcher.startApplication() try { - eventually(timeout(2 minutes), interval(1 second)) { + eventually(timeout(2.minutes), interval(1.second)) { assert(handle.getState().isFinal()) } } finally { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 79443596346d..d718458a5085 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -175,7 +175,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(30.seconds), interval(100.millis)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -183,7 +183,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(30.seconds), interval(100.millis)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 1936bb72e220..40818b6067fa 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -302,7 +302,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile2 = s2.registeredExecutorFile recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString) - eventually(timeout(10 seconds), interval(5 millis)) { + eventually(timeout(10.seconds), interval(5.millis)) { assert(!execStateFile.exists()) } From df67cdec42d52f87c55a52b2d1ceb785ee0bf443 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 30 Jul 2016 00:32:38 -0700 Subject: [PATCH 10/11] Switch back to try to capture things besinds non-zero exit values --- core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 4 +++- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 0f2d4363d2e0..c40f4fe08acc 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -22,6 +22,7 @@ import java.io.File import scala.collection.Map import scala.io.Codec import scala.sys.process._ +import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{LongWritable, Text} @@ -213,7 +214,8 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } def testCommandAvailable(command: String): Boolean = { - Process(command).run().exitValue() == 0 + val attempt = Try(Process(command).run().exitValue()) + attempt.isSuccess && attempt.get == 0 } def testExportInputFile(varName: String) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f6df4b9bc97d..c599cdebd628 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import java.sql.{Date, Timestamp} import scala.sys.process.Process +import scala.util.Try import org.apache.hadoop.fs.Path @@ -1773,6 +1774,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } def testCommandAvailable(command: String): Boolean = { - Process(command).run().exitValue() == 0 + val attempt = Try(Process(command).run().exitValue()) + attempt.isSuccess && attempt.get == 0 } } From e4d8452f540140b7ade22a472b5334dae88df559 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 8 Aug 2016 11:39:47 -0700 Subject: [PATCH 11/11] Update to keep postfix time operations --- .../spark/deploy/FaultToleranceTest.scala | 17 +++--- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../org/apache/spark/SparkConfSuite.scala | 5 +- .../org/apache/spark/StatusTrackerSuite.scala | 29 +++++----- .../history/FsHistoryProviderSuite.scala | 5 +- .../deploy/history/HistoryServerSuite.scala | 7 +-- .../spark/deploy/master/MasterSuite.scala | 3 +- .../spark/launcher/LauncherBackendSuite.scala | 5 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 53 ++++++++++--------- .../OutputCommitCoordinatorSuite.scala | 3 +- .../scheduler/TaskResultGetterSuite.scala | 4 +- .../BlockManagerReplicationSuite.scala | 9 ++-- .../spark/storage/BlockManagerSuite.scala | 21 ++++---- .../apache/spark/util/EventLoopSuite.scala | 19 +++---- .../flume/FlumePollingStreamSuite.scala | 3 +- .../streaming/flume/FlumeStreamSuite.scala | 5 +- .../kafka010/DirectKafkaStreamSuite.scala | 17 +++--- .../kafka/DirectKafkaStreamSuite.scala | 13 ++--- .../streaming/kafka/KafkaStreamSuite.scala | 3 +- .../kafka/ReliableKafkaStreamSuite.scala | 5 +- .../kinesis/KinesisCheckpointerSuite.scala | 11 ++-- .../kinesis/KinesisStreamSuite.scala | 7 +-- .../apache/spark/sql/CachedTableSuite.scala | 11 ++-- .../util/FileBasedWriteAheadLog.scala | 3 +- .../streaming/ReceivedBlockHandlerSuite.scala | 4 +- .../scheduler/JobGeneratorSuite.scala | 3 +- .../deploy/yarn/BaseYarnClusterSuite.scala | 3 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 5 +- .../yarn/YarnShuffleServiceSuite.scala | 3 +- 29 files changed, 155 insertions(+), 124 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index ff8b10c7cba3..79f4d06c8460 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ +import scala.language.postfixOps import scala.sys.process._ import org.json4s._ @@ -112,7 +113,7 @@ private object FaultToleranceTest extends App with Logging { assertValidClusterState() killLeader() - delay(30.seconds) + delay(30 seconds) assertValidClusterState() createClient() assertValidClusterState() @@ -126,12 +127,12 @@ private object FaultToleranceTest extends App with Logging { killLeader() addMasters(1) - delay(30.seconds) + delay(30 seconds) assertValidClusterState() killLeader() addMasters(1) - delay(30.seconds) + delay(30 seconds) assertValidClusterState() } @@ -156,7 +157,7 @@ private object FaultToleranceTest extends App with Logging { killLeader() workers.foreach(_.kill()) workers.clear() - delay(30.seconds) + delay(30 seconds) addWorkers(2) assertValidClusterState() } @@ -174,7 +175,7 @@ private object FaultToleranceTest extends App with Logging { (1 to 3).foreach { _ => killLeader() - delay(30.seconds) + delay(30 seconds) assertValidClusterState() assertTrue(getLeader == masters.head) addMasters(1) @@ -264,7 +265,7 @@ private object FaultToleranceTest extends App with Logging { } // Avoid waiting indefinitely (e.g., we could register but get no executors). - assertTrue(ThreadUtils.awaitResult(f, 120.seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) } /** @@ -317,7 +318,7 @@ private object FaultToleranceTest extends App with Logging { } try { - assertTrue(ThreadUtils.awaitResult(f, 120.seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) } catch { case e: TimeoutException => logError("Master states: " + masters.map(_.state)) @@ -421,7 +422,7 @@ private object SparkDocker { } dockerCmd.run(ProcessLogger(findIpAndLog _)) - val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds) + val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds) val dockerId = Docker.getLastProcessId (ip, dockerId, outFile) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f684bedb040b..4eb7c81f9e8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -27,6 +27,7 @@ import scala.collection.Map import scala.collection.mutable.{HashMap, HashSet, Stack} import scala.concurrent.duration._ import scala.language.existentials +import scala.language.postfixOps import scala.util.control.NonFatal import org.apache.commons.lang3.SerializationUtils @@ -232,7 +233,7 @@ class DAGScheduler( blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) blockManagerMaster.driverEndpoint.askWithRetry[Boolean]( - BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600.seconds, "BlockManagerHeartbeat")) + BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } /** diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 1b17894e143e..a883d1b57e52 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo @@ -261,10 +262,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(RpcUtils.retryWaitMs(conf) === 2L) conf.set("spark.akka.askTimeout", "3") - assert(RpcUtils.askRpcTimeout(conf).duration === (3.seconds)) + assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds)) conf.set("spark.akka.lookupTimeout", "4") - assert(RpcUtils.lookupRpcTimeout(conf).duration === (4.seconds)) + assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds)) } test("SPARK-13727") { diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index cd8503d0ede4..5483f2b8434a 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.language.postfixOps import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -30,25 +31,25 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont test("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() - val jobId: Int = eventually(timeout(10.seconds)) { + val jobId: Int = eventually(timeout(10 seconds)) { val jobIds = jobFuture.jobIds jobIds.size should be(1) jobIds.head } - val jobInfo = eventually(timeout(10.seconds)) { + val jobInfo = eventually(timeout(10 seconds)) { sc.statusTracker.getJobInfo(jobId).get } jobInfo.status() should not be FAILED val stageIds = jobInfo.stageIds() stageIds.size should be(2) - val firstStageInfo = eventually(timeout(10.seconds)) { + val firstStageInfo = eventually(timeout(10 seconds)) { sc.statusTracker.getStageInfo(stageIds(0)).get } firstStageInfo.stageId() should be(stageIds(0)) firstStageInfo.currentAttemptId() should be(0) firstStageInfo.numTasks() should be(2) - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds(0)).get updatedFirstStageInfo.numCompletedTasks() should be(2) updatedFirstStageInfo.numActiveTasks() should be(0) @@ -60,27 +61,27 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc = new SparkContext("local", "test", new SparkConf(false)) // Passing `null` should return jobs that were not run in a job group: val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync() - val defaultJobGroupJobId = eventually(timeout(10.seconds)) { + val defaultJobGroupJobId = eventually(timeout(10 seconds)) { defaultJobGroupFuture.jobIds.head } - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId)) } // Test jobs submitted in job groups: sc.setJobGroup("my-job-group", "description") sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty) val firstJobFuture = sc.parallelize(1 to 1000).countAsync() - val firstJobId = eventually(timeout(10.seconds)) { + val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) } val secondJobFuture = sc.parallelize(1 to 1000).countAsync() - val secondJobId = eventually(timeout(10.seconds)) { + val secondJobId = eventually(timeout(10 seconds)) { secondJobFuture.jobIds.head } - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be ( Set(firstJobId, secondJobId)) } @@ -91,10 +92,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc.setJobGroup("my-job-group2", "description") sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) - val firstJobId = eventually(timeout(10.seconds)) { + val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) } } @@ -104,10 +105,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont sc.setJobGroup("my-job-group2", "description") sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999) - val firstJobId = eventually(timeout(10.seconds)) { + val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2 } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d1902dc9f896..39c5857b1345 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.hdfs.DistributedFileSystem @@ -367,7 +368,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1.second), interval(10.millis)) { + eventually(timeout(1 second), interval(10 millis)) { provider.getConfig().keys should not contain ("HDFS State") } } finally { @@ -385,7 +386,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1.second), interval(10.millis)) { + eventually(timeout(1 second), interval(10 millis)) { verify(errorHandler).uncaughtException(any(), any()) } } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index dd39cb1c307b..631a7cd9d5d7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -23,6 +23,7 @@ import java.util.zip.ZipInputStream import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.concurrent.duration._ +import scala.language.postfixOps import com.codahale.metrics.Counter import com.google.common.io.{ByteStreams, Files} @@ -348,8 +349,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // start initial job val d = sc.parallelize(1 to 10) d.count() - val stdInterval = interval(100.milliseconds) - val appId = eventually(timeout(20.seconds), stdInterval) { + val stdInterval = interval(100 milliseconds) + val appId = eventually(timeout(20 seconds), stdInterval) { val json = getContentAndCode("applications", port)._2.get val apps = parse(json).asInstanceOf[JArray].arr apps should have size 1 @@ -429,7 +430,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers d2.count() dumpLogDir("After second job") - val stdTimeout = timeout(10.seconds) + val stdTimeout = timeout(10 seconds) logDebug("waiting for UI to update") eventually(stdTimeout, stdInterval) { assert(2 === getNumJobs(""), diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 3f1add309b8e..7cbe4e342eaa 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.io.Source +import scala.language.postfixOps import org.json4s._ import org.json4s.jackson.JsonMethods._ @@ -139,7 +140,7 @@ class MasterSuite extends SparkFunSuite val localCluster = new LocalSparkCluster(2, 2, 512, conf) localCluster.start() try { - eventually(timeout(5.seconds), interval(100.milliseconds)) { + eventually(timeout(5 seconds), interval(100 milliseconds)) { val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json") .getLines().mkString("\n") val JArray(workers) = (parse(json) \ "workers") diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala index 5d130baffddd..cac15a1dc441 100644 --- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.launcher import java.util.concurrent.TimeUnit import scala.concurrent.duration._ +import scala.language.postfixOps import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -52,13 +53,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers { .startApplication() try { - eventually(timeout(30.seconds), interval(100.millis)) { + eventually(timeout(30 seconds), interval(100 millis)) { handle.getAppId() should not be (null) } handle.stop() - eventually(timeout(30.seconds), interval(100.millis)) { + eventually(timeout(30 seconds), interval(100 millis)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index d38e3bf2306b..acdf21df9a16 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import scala.collection.JavaConverters._ import scala.concurrent.Await import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.io.Files import org.mockito.Matchers.any @@ -77,7 +78,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) rpcEndpointRef.send("hello") - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { assert("hello" === message) } } @@ -98,7 +99,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely") try { rpcEndpointRef.send("hello") - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { assert("hello" === message) } } finally { @@ -178,7 +179,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause val e = intercept[SparkException] { - rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1.millis, shortProp)) + rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp)) } // The SparkException cause should be a RpcTimeoutException with message indicating the // controlling timeout property @@ -234,7 +235,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { assert(e.getMessage === "Oops!") } } @@ -259,7 +260,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(endpointRef) - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { assert(e.getMessage === "Oops!") } } @@ -280,7 +281,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { endpointRef.send("Foo") - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { assert(e.getMessage === "Oops!") } } @@ -301,7 +302,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { // Calling `self` in `onStart` is fine assert(callSelfSuccessfully === true) } @@ -322,7 +323,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { endpointRef.send("Foo") - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { // Calling `self` in `receive` is fine assert(callSelfSuccessfully === true) } @@ -345,7 +346,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(endpointRef) - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { // Calling `self` in `onStop` will return null, so selfOption will be None assert(selfOption == None) } @@ -374,7 +375,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }.start() } - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(result == 1000) } @@ -399,7 +400,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(endpointRef) env.stop(endpointRef) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { // Calling stop twice should only trigger onStop once. assert(onStopCount == 1) } @@ -415,7 +416,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }) val f = endpointRef.ask[String]("Hi") - val ack = ThreadUtils.awaitResult(f, 5.seconds) + val ack = ThreadUtils.awaitResult(f, 5 seconds) assert("ack" === ack) env.stop(endpointRef) @@ -435,7 +436,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely") try { val f = rpcEndpointRef.ask[String]("hello") - val ack = ThreadUtils.awaitResult(f, 5.seconds) + val ack = ThreadUtils.awaitResult(f, 5 seconds) assert("ack" === ack) } finally { anotherEnv.shutdown() @@ -454,7 +455,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val f = endpointRef.ask[String]("Hi") val e = intercept[SparkException] { - ThreadUtils.awaitResult(f, 5.seconds) + ThreadUtils.awaitResult(f, 5 seconds) } assert("Oops" === e.getCause.getMessage) @@ -476,7 +477,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { val f = rpcEndpointRef.ask[String]("hello") val e = intercept[SparkException] { - ThreadUtils.awaitResult(f, 5.seconds) + ThreadUtils.awaitResult(f, 5 seconds) } assert("Oops" === e.getCause.getMessage) } finally { @@ -528,14 +529,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Send a message to set up the connection serverRefInServer2.send("hello") - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(events.contains(("onConnected", serverEnv2.address))) } serverEnv2.shutdown() serverEnv2.awaitTermination() - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(events.contains(("onConnected", serverEnv2.address))) assert(events.contains(("onDisconnected", serverEnv2.address))) } @@ -556,7 +557,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Send a message to set up the connection serverRefInClient.send("hello") - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { // We don't know the exact client address but at least we can verify the message type assert(events.asScala.map(_._1).exists(_ == "onConnected")) } @@ -564,7 +565,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { clientEnv.shutdown() clientEnv.awaitTermination() - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { // We don't know the exact client address but at least we can verify the message type assert(events.asScala.map(_._1).exists(_ == "onConnected")) assert(events.asScala.map(_._1).exists(_ == "onDisconnected")) @@ -587,14 +588,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Send a message to set up the connection serverRefInClient.send("hello") - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(events.contains(("onConnected", serverEnv.address))) } serverEnv.shutdown() serverEnv.awaitTermination() - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(events.contains(("onConnected", serverEnv.address))) assert(events.contains(("onDisconnected", serverEnv.address))) } @@ -622,7 +623,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { val f = rpcEndpointRef.ask[String]("hello") val e = intercept[SparkException] { - ThreadUtils.awaitResult(f, 1.seconds) + ThreadUtils.awaitResult(f, 1 seconds) } assert(e.getCause.isInstanceOf[NotSerializableException]) } finally { @@ -655,7 +656,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }) val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "send-authentication") rpcEndpointRef.send("hello") - eventually(timeout(5.seconds), interval(10.millis)) { + eventually(timeout(5 seconds), interval(10 millis)) { assert("hello" === message) } } finally { @@ -737,8 +738,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - val longTimeout = new RpcTimeout(1.second, "spark.rpc.long.timeout") - val shortTimeout = new RpcTimeout(10.millis, "spark.rpc.short.timeout") + val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout") + val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout") // Ask with immediate response, should complete successfully val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout) @@ -763,7 +764,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // once the future is complete to verify addMessageIfTimeout was invoked val reply3 = intercept[RpcTimeoutException] { - Await.result(fut3, 2000.millis) + Await.result(fut3, 2000 millis) }.getMessage // scalastyle:on awaitresult diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index f0477d4615f2..83288db92bb4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.util.concurrent.TimeoutException import scala.concurrent.duration._ +import scala.language.postfixOps import org.apache.hadoop.mapred.{JobConf, OutputCommitter, TaskAttemptContext, TaskAttemptID} import org.mockito.Matchers @@ -158,7 +159,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. val e = intercept[SparkException] { - ThreadUtils.awaitResult(futureAction, 5.seconds) + ThreadUtils.awaitResult(futureAction, 5 seconds) } assert(e.getCause.isInstanceOf[TimeoutException]) assert(tempDir.list().size === 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index d4b46e28f29b..9e472f900b65 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.control.NonFatal import com.google.common.util.concurrent.MoreExecutors @@ -60,7 +61,7 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task sparkEnv.blockManager.master.removeBlock(blockId) // removeBlock is asynchronous. Need to wait it's removed successfully try { - eventually(timeout(3.seconds), interval(200.milliseconds)) { + eventually(timeout(3 seconds), interval(200 milliseconds)) { assert(!sparkEnv.blockManager.master.contains(blockId)) } removeBlockSuccessfully = true @@ -247,3 +248,4 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local } } + diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 52ee8f0628b7..b9e3a364ee22 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.language.postfixOps import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, Matchers} @@ -292,7 +293,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite // Add another normal block manager and test that 2x replication works makeBlockManager(10000, "anotherStore") - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(replicateAndGetNumCopies("a2") === 2) } } @@ -317,14 +318,14 @@ class BlockManagerReplicationSuite extends SparkFunSuite // Add another store, 3x replication should work now, 4x replication should only replicate 3x val newStore1 = makeBlockManager(storeSize, s"newstore1") - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(replicateAndGetNumCopies("a3", 3) === 3) } assert(replicateAndGetNumCopies("a4", 4) === 3) // Add another store, 4x replication should work now val newStore2 = makeBlockManager(storeSize, s"newstore2") - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(replicateAndGetNumCopies("a5", 4) === 4) } @@ -340,7 +341,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") } - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(replicateAndGetNumCopies("a7", 3) === 3) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ddb69af2ccb2..87c8628ce97e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.Future import scala.language.implicitConversions +import scala.language.postfixOps import scala.reflect.ClassTag import org.mockito.{Matchers => mc} @@ -256,19 +257,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeBlock("a2-to-remove") master.removeBlock("a3-to-remove") - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(!store.hasLocalBlock("a1-to-remove")) master.getLocations("a1-to-remove") should have size 0 } - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(!store.hasLocalBlock("a2-to-remove")) master.getLocations("a2-to-remove") should have size 0 } - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(store.hasLocalBlock("a3-to-remove")) master.getLocations("a3-to-remove") should have size 0 } - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { val memStatus = master.getMemoryStatus.head._2 memStatus._1 should equal (40000L) memStatus._2 should equal (40000L) @@ -286,15 +287,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = false) - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) master.getLocations(rdd(0, 0)) should have size 0 } - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { store.getSingleAndReleaseLock(rdd(0, 1)) should be (None) master.getLocations(rdd(0, 1)) should have size 0 } - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { store.getSingleAndReleaseLock("nonrddblock") should not be (None) master.getLocations("nonrddblock") should have size (1) } @@ -359,7 +360,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // remove broadcast 1 block from both the stores asynchronously // and verify all broadcast 1 blocks have been removed master.removeBroadcast(1, removeFromMaster = true, blocking = false) - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(!driverStore.hasLocalBlock(broadcast1BlockId)) assert(!executorStore.hasLocalBlock(broadcast1BlockId)) } @@ -367,7 +368,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // remove broadcast 2 from both the stores asynchronously // and verify all broadcast 2 blocks have been removed master.removeBroadcast(2, removeFromMaster = true, blocking = false) - eventually(timeout(1000.milliseconds), interval(10.milliseconds)) { + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { assert(!driverStore.hasLocalBlock(broadcast2BlockId)) assert(!driverStore.hasLocalBlock(broadcast2BlockId2)) assert(!executorStore.hasLocalBlock(broadcast2BlockId)) @@ -871,7 +872,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } // Make sure get a1 doesn't hang and returns None. - failAfter(1.second) { + failAfter(1 second) { assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store") } } diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index de6f436e3a58..6f7dddd4f760 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch} import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts @@ -41,7 +42,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() (1 to 100).foreach(eventLoop.post) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert((1 to 100) === buffer.asScala.toSeq) } eventLoop.stop() @@ -76,7 +77,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(e === receivedError) } eventLoop.stop() @@ -98,7 +99,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(e === receivedError) assert(eventLoop.isActive) } @@ -153,7 +154,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { }.start() } - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(threadNum * eventsFromEachThread === receivedEventsCount) } eventLoop.stop() @@ -178,7 +179,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - failAfter(5.seconds) { + failAfter(5 seconds) { // Wait until we enter `onReceive` onReceiveLatch.await() eventLoop.stop() @@ -199,7 +200,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(!eventLoop.isActive) } } @@ -223,7 +224,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } } eventLoop.start() - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(!eventLoop.isActive) } assert(onStopCalled) @@ -246,7 +247,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(!eventLoop.isActive) } assert(onStopCalled) @@ -270,7 +271,7 @@ class EventLoopSuite extends SparkFunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5.seconds), interval(5.millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(!eventLoop.isActive) } assert(onStopCalled) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 2adfa9d86c26..1c93079497f6 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ @@ -126,7 +127,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with clock.advance(batchDuration.milliseconds) // The eventually is required to ensure that all data in the batch has been processed. - eventually(timeout(10.seconds), interval(100.milliseconds)) { + eventually(timeout(10 seconds), interval(100 milliseconds)) { val flattenOutput = outputQueue.asScala.toSeq.flatten val headers = flattenOutput.map(_.event.getHeaders.asScala.map { case (key, value) => (key.toString, value.toString) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index c0b8dd09899d..7bac1cc4b0ae 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import org.jboss.netty.channel.ChannelPipeline import org.jboss.netty.channel.socket.SocketChannel @@ -54,11 +55,11 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w try { val outputQueue = startContext(utils.getTestPort(), testCompression) - eventually(timeout(10.seconds), interval(100.milliseconds)) { + eventually(timeout(10 seconds), interval(100 milliseconds)) { utils.writeInput(input.asJava, testCompression) } - eventually(timeout(10.seconds), interval(100.milliseconds)) { + eventually(timeout(10 seconds), interval(100 milliseconds)) { val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } outputEvents.foreach { event => diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 863fc60bafa0..b1d90b8a82d5 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.Random import org.apache.kafka.clients.consumer._ @@ -237,7 +238,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10.seconds), interval(20.milliseconds)) { + eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() > 3) } val offsetBeforeStart = getLatestOffset() @@ -265,7 +266,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -287,7 +288,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10.seconds), interval(20.milliseconds)) { + eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() >= 10) } val offsetBeforeStart = getLatestOffset() @@ -317,7 +318,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -365,7 +366,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -404,7 +405,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() @@ -427,7 +428,7 @@ class DirectKafkaStreamSuite def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(strings.forall { collectedData.contains }) } } @@ -594,7 +595,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(10.milliseconds)) { + eventually(timeout(5.seconds), interval(10 milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index aa695a4cc7c4..ab1c5055a253 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata @@ -157,7 +158,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10.seconds), interval(20.milliseconds)) { + eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() > 3) } val offsetBeforeStart = getLatestOffset() @@ -179,7 +180,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -202,7 +203,7 @@ class DirectKafkaStreamSuite // Send some initial messages before starting context kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10.seconds), interval(20.milliseconds)) { + eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() >= 10) } val offsetBeforeStart = getLatestOffset() @@ -225,7 +226,7 @@ class DirectKafkaStreamSuite ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } assert(!collectedData.contains("a")) @@ -273,7 +274,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -316,7 +317,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 0dfedadbaf0e..6a35ac14a8f6 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka import scala.collection.mutable import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder @@ -76,7 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc.start() - eventually(timeout(10000.milliseconds), interval(100.milliseconds)) { + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { assert(result.synchronized { sent === result }) } } diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index c72e582e2bcb..7b9aee39ffb7 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -21,6 +21,7 @@ import java.io.File import scala.collection.mutable import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder @@ -104,7 +105,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite } ssc.start() - eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { + eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { // A basic process verification for ReliableKafkaReceiver. // Verify whether received message number is equal to the sent message number. assert(data.size === result.size) @@ -131,7 +132,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite stream.foreachRDD(_ => Unit) ssc.start() - eventually(timeout(20000.milliseconds), interval(100.milliseconds)) { + eventually(timeout(20000 milliseconds), interval(100 milliseconds)) { // Verify the offset for each group/topic to see whether they are equal to the expected one. topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala index c83c06ca7872..e1499a822099 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeoutException} import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.language.postfixOps import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer import org.mockito.Matchers._ @@ -88,7 +89,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) clock.advance(5 * checkpointInterval.milliseconds) - eventually(timeout(1.second)) { + eventually(timeout(1 second)) { verify(checkpointerMock, times(1)).checkpoint(seqNum) verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) } @@ -109,7 +110,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) clock.advance(checkpointInterval.milliseconds * 5) - eventually(timeout(1.second)) { + eventually(timeout(1 second)) { verify(checkpointerMock, atMost(1)).checkpoint(anyString()) } } @@ -132,7 +133,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) clock.advance(checkpointInterval.milliseconds) - eventually(timeout(1.second)) { + eventually(timeout(1 second)) { verify(checkpointerMock, times(1)).checkpoint(anyString()) } // don't block test thread @@ -140,11 +141,11 @@ class KinesisCheckpointerSuite extends TestSuiteBase ExecutionContext.global) intercept[TimeoutException] { - Await.ready(f, 50.millis) + Await.ready(f, 50 millis) } clock.advance(checkpointInterval.milliseconds / 2) - eventually(timeout(1.second)) { + eventually(timeout(1 second)) { verify(checkpointerMock, times(2)).checkpoint(anyString()) } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 5da7c411df2e..0e71bf9b8433 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kinesis import scala.collection.mutable import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.Random import com.amazonaws.regions.RegionUtils @@ -187,7 +188,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.start() val testData = 1 to 10 - eventually(timeout(120.seconds), interval(10.second)) { + eventually(timeout(120 seconds), interval(10 second)) { testUtils.pushData(testData, aggregateTestData) assert(collected.synchronized { collected === testData.toSet }, "\nData received does not match data sent") @@ -215,7 +216,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.start() val testData = 1 to 10 - eventually(timeout(120.seconds), interval(10.second)) { + eventually(timeout(120 seconds), interval(10 second)) { testUtils.pushData(testData, aggregateTestData) val modData = testData.map(_ + 5) assert(collected.synchronized { collected === modData.toSet }, @@ -259,7 +260,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Run until there are at least 10 batches with some data in them // If this times out because numBatchesWithData is empty, then its likely that foreachRDD // function failed with exceptions, and nothing got added to `collectedData` - eventually(timeout(2.minutes), interval(1.seconds)) { + eventually(timeout(2 minutes), interval(1 seconds)) { testUtils.pushData(1 to 5, aggregateTestData) assert(isCheckpointPresent && numBatchesWithData > 10) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 239f44ab3889..f42402e1cc7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import scala.collection.mutable.HashSet import scala.concurrent.duration._ +import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ @@ -220,7 +221,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("UNCACHE TABLE testData") assert(!spark.catalog.isCached("testData"), "Table 'testData' should not be cached") - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -236,7 +237,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext "Eagerly cached in-memory table should have already been materialized") spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -253,7 +254,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext "Eagerly cached in-memory table should have already been materialized") spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -274,7 +275,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext "Lazily cached in-memory table should have been materialized") spark.catalog.uncacheTable("testData") - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } @@ -364,7 +365,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext System.gc() - eventually(timeout(10.seconds)) { + eventually(timeout(10 seconds)) { assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty }, "batchStats accumulators should be cleared after GC when uncacheTable") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index ad4ff7ca4e36..9b689f01b8d3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.ExecutionContextTaskSupport import scala.concurrent.{Await, ExecutionContext, Future} +import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -187,7 +188,7 @@ private[streaming] class FileBasedWriteAheadLog( val f = Future { deleteFile(logInfo) }(executionContext) if (waitForCompletion) { import scala.concurrent.duration._ - Await.ready(f, 1.second) + Await.ready(f, 1 second) } } catch { case e: RejectedExecutionException => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 47f93aa08bea..feb5c30c6aa1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ +import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, Matchers} @@ -188,7 +189,7 @@ class ReceivedBlockHandlerSuite val cleanupThreshTime = 3000L handler.cleanupOldBlocks(cleanupThreshTime) - eventually(timeout(10000.millis), interval(10.millis)) { + eventually(timeout(10000 millis), interval(10 millis)) { getWriteAheadLogFiles().size should be < preCleanupLogFiles.size } } @@ -415,3 +416,4 @@ class ReceivedBlockHandlerSuite private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) } + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala index 3faecb821aff..a2dbae149f31 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler import java.util.concurrent.CountDownLatch import scala.concurrent.duration._ +import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ @@ -68,7 +69,7 @@ class JobGeneratorSuite extends TestSuiteBase { val longBatchNumber = 3 // 3rd batch will take a long time val longBatchTime = longBatchNumber * batchDuration.milliseconds - val testTimeout = timeout(10.seconds) + val testTimeout = timeout(10 seconds) val inputStream = ssc.receiverStream(new TestReceiver) inputStream.foreachRDD((rdd: RDD[Int], time: Time) => { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 48f7546dd089..9c3b18e4ec5f 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.io.Files import org.apache.commons.lang3.SerializationUtils @@ -166,7 +167,7 @@ abstract class BaseYarnClusterSuite val handle = launcher.startApplication() try { - eventually(timeout(2.minutes), interval(1.second)) { + eventually(timeout(2 minutes), interval(1 second)) { assert(handle.getState().isFinal()) } } finally { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d718458a5085..8ab7b21c2213 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -24,6 +24,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -175,7 +176,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(30.seconds), interval(100.millis)) { + eventually(timeout(30 seconds), interval(100 millis)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -183,7 +184,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30.seconds), interval(100.millis)) { + eventually(timeout(30 seconds), interval(100 millis)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 40818b6067fa..e123e7854104 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -23,6 +23,7 @@ import java.util.EnumSet import scala.annotation.tailrec import scala.concurrent.duration._ +import scala.language.postfixOps import org.apache.hadoop.fs.Path import org.apache.hadoop.service.ServiceStateException @@ -302,7 +303,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile2 = s2.registeredExecutorFile recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString) - eventually(timeout(10.seconds), interval(5.millis)) { + eventually(timeout(10 seconds), interval(5 millis)) { assert(!execStateFile.exists()) }