Skip to content

Commit e4d8452

Browse files
committed
Update to keep postfix time operations
1 parent 4e81888 commit e4d8452

File tree

29 files changed

+155
-124
lines changed

29 files changed

+155
-124
lines changed

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer
2626
import scala.concurrent.{Future, Promise}
2727
import scala.concurrent.ExecutionContext.Implicits.global
2828
import scala.concurrent.duration._
29+
import scala.language.postfixOps
2930
import scala.sys.process._
3031

3132
import org.json4s._
@@ -112,7 +113,7 @@ private object FaultToleranceTest extends App with Logging {
112113
assertValidClusterState()
113114

114115
killLeader()
115-
delay(30.seconds)
116+
delay(30 seconds)
116117
assertValidClusterState()
117118
createClient()
118119
assertValidClusterState()
@@ -126,12 +127,12 @@ private object FaultToleranceTest extends App with Logging {
126127

127128
killLeader()
128129
addMasters(1)
129-
delay(30.seconds)
130+
delay(30 seconds)
130131
assertValidClusterState()
131132

132133
killLeader()
133134
addMasters(1)
134-
delay(30.seconds)
135+
delay(30 seconds)
135136
assertValidClusterState()
136137
}
137138

@@ -156,7 +157,7 @@ private object FaultToleranceTest extends App with Logging {
156157
killLeader()
157158
workers.foreach(_.kill())
158159
workers.clear()
159-
delay(30.seconds)
160+
delay(30 seconds)
160161
addWorkers(2)
161162
assertValidClusterState()
162163
}
@@ -174,7 +175,7 @@ private object FaultToleranceTest extends App with Logging {
174175

175176
(1 to 3).foreach { _ =>
176177
killLeader()
177-
delay(30.seconds)
178+
delay(30 seconds)
178179
assertValidClusterState()
179180
assertTrue(getLeader == masters.head)
180181
addMasters(1)
@@ -264,7 +265,7 @@ private object FaultToleranceTest extends App with Logging {
264265
}
265266

266267
// Avoid waiting indefinitely (e.g., we could register but get no executors).
267-
assertTrue(ThreadUtils.awaitResult(f, 120.seconds))
268+
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
268269
}
269270

270271
/**
@@ -317,7 +318,7 @@ private object FaultToleranceTest extends App with Logging {
317318
}
318319

319320
try {
320-
assertTrue(ThreadUtils.awaitResult(f, 120.seconds))
321+
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
321322
} catch {
322323
case e: TimeoutException =>
323324
logError("Master states: " + masters.map(_.state))
@@ -421,7 +422,7 @@ private object SparkDocker {
421422
}
422423

423424
dockerCmd.run(ProcessLogger(findIpAndLog _))
424-
val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
425+
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
425426
val dockerId = Docker.getLastProcessId
426427
(ip, dockerId, outFile)
427428
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.collection.Map
2727
import scala.collection.mutable.{HashMap, HashSet, Stack}
2828
import scala.concurrent.duration._
2929
import scala.language.existentials
30+
import scala.language.postfixOps
3031
import scala.util.control.NonFatal
3132

3233
import org.apache.commons.lang3.SerializationUtils
@@ -232,7 +233,7 @@ class DAGScheduler(
232233
blockManagerId: BlockManagerId): Boolean = {
233234
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
234235
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
235-
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600.seconds, "BlockManagerHeartbeat"))
236+
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
236237
}
237238

238239
/**

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.concurrent.{Executors, TimeUnit}
2121

2222
import scala.collection.JavaConverters._
2323
import scala.concurrent.duration._
24+
import scala.language.postfixOps
2425
import scala.util.{Random, Try}
2526

2627
import com.esotericsoftware.kryo.Kryo
@@ -261,10 +262,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
261262
assert(RpcUtils.retryWaitMs(conf) === 2L)
262263

263264
conf.set("spark.akka.askTimeout", "3")
264-
assert(RpcUtils.askRpcTimeout(conf).duration === (3.seconds))
265+
assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
265266

266267
conf.set("spark.akka.lookupTimeout", "4")
267-
assert(RpcUtils.lookupRpcTimeout(conf).duration === (4.seconds))
268+
assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
268269
}
269270

270271
test("SPARK-13727") {

core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import scala.concurrent.duration._
2121
import scala.language.implicitConversions
22+
import scala.language.postfixOps
2223

2324
import org.scalatest.Matchers
2425
import org.scalatest.concurrent.Eventually._
@@ -30,25 +31,25 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
3031
test("basic status API usage") {
3132
sc = new SparkContext("local", "test", new SparkConf(false))
3233
val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
33-
val jobId: Int = eventually(timeout(10.seconds)) {
34+
val jobId: Int = eventually(timeout(10 seconds)) {
3435
val jobIds = jobFuture.jobIds
3536
jobIds.size should be(1)
3637
jobIds.head
3738
}
38-
val jobInfo = eventually(timeout(10.seconds)) {
39+
val jobInfo = eventually(timeout(10 seconds)) {
3940
sc.statusTracker.getJobInfo(jobId).get
4041
}
4142
jobInfo.status() should not be FAILED
4243
val stageIds = jobInfo.stageIds()
4344
stageIds.size should be(2)
4445

45-
val firstStageInfo = eventually(timeout(10.seconds)) {
46+
val firstStageInfo = eventually(timeout(10 seconds)) {
4647
sc.statusTracker.getStageInfo(stageIds(0)).get
4748
}
4849
firstStageInfo.stageId() should be(stageIds(0))
4950
firstStageInfo.currentAttemptId() should be(0)
5051
firstStageInfo.numTasks() should be(2)
51-
eventually(timeout(10.seconds)) {
52+
eventually(timeout(10 seconds)) {
5253
val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds(0)).get
5354
updatedFirstStageInfo.numCompletedTasks() should be(2)
5455
updatedFirstStageInfo.numActiveTasks() should be(0)
@@ -60,27 +61,27 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
6061
sc = new SparkContext("local", "test", new SparkConf(false))
6162
// Passing `null` should return jobs that were not run in a job group:
6263
val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync()
63-
val defaultJobGroupJobId = eventually(timeout(10.seconds)) {
64+
val defaultJobGroupJobId = eventually(timeout(10 seconds)) {
6465
defaultJobGroupFuture.jobIds.head
6566
}
66-
eventually(timeout(10.seconds)) {
67+
eventually(timeout(10 seconds)) {
6768
sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId))
6869
}
6970
// Test jobs submitted in job groups:
7071
sc.setJobGroup("my-job-group", "description")
7172
sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty)
7273
val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
73-
val firstJobId = eventually(timeout(10.seconds)) {
74+
val firstJobId = eventually(timeout(10 seconds)) {
7475
firstJobFuture.jobIds.head
7576
}
76-
eventually(timeout(10.seconds)) {
77+
eventually(timeout(10 seconds)) {
7778
sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
7879
}
7980
val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
80-
val secondJobId = eventually(timeout(10.seconds)) {
81+
val secondJobId = eventually(timeout(10 seconds)) {
8182
secondJobFuture.jobIds.head
8283
}
83-
eventually(timeout(10.seconds)) {
84+
eventually(timeout(10 seconds)) {
8485
sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (
8586
Set(firstJobId, secondJobId))
8687
}
@@ -91,10 +92,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
9192
sc.setJobGroup("my-job-group2", "description")
9293
sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
9394
val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
94-
val firstJobId = eventually(timeout(10.seconds)) {
95+
val firstJobId = eventually(timeout(10 seconds)) {
9596
firstJobFuture.jobIds.head
9697
}
97-
eventually(timeout(10.seconds)) {
98+
eventually(timeout(10 seconds)) {
9899
sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId))
99100
}
100101
}
@@ -104,10 +105,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
104105
sc.setJobGroup("my-job-group2", "description")
105106
sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
106107
val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999)
107-
val firstJobId = eventually(timeout(10.seconds)) {
108+
val firstJobId = eventually(timeout(10 seconds)) {
108109
firstJobFuture.jobIds.head
109110
}
110-
eventually(timeout(10.seconds)) {
111+
eventually(timeout(10 seconds)) {
111112
sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2
112113
}
113114
}

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit
2525
import java.util.zip.{ZipInputStream, ZipOutputStream}
2626

2727
import scala.concurrent.duration._
28+
import scala.language.postfixOps
2829

2930
import com.google.common.io.{ByteStreams, Files}
3031
import org.apache.hadoop.hdfs.DistributedFileSystem
@@ -367,7 +368,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
367368
provider.inSafeMode = false
368369
clock.setTime(10000)
369370

370-
eventually(timeout(1.second), interval(10.millis)) {
371+
eventually(timeout(1 second), interval(10 millis)) {
371372
provider.getConfig().keys should not contain ("HDFS State")
372373
}
373374
} finally {
@@ -385,7 +386,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
385386
provider.inSafeMode = false
386387
clock.setTime(10000)
387388

388-
eventually(timeout(1.second), interval(10.millis)) {
389+
eventually(timeout(1 second), interval(10 millis)) {
389390
verify(errorHandler).uncaughtException(any(), any())
390391
}
391392
} finally {

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.zip.ZipInputStream
2323
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
2424

2525
import scala.concurrent.duration._
26+
import scala.language.postfixOps
2627

2728
import com.codahale.metrics.Counter
2829
import com.google.common.io.{ByteStreams, Files}
@@ -348,8 +349,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
348349
// start initial job
349350
val d = sc.parallelize(1 to 10)
350351
d.count()
351-
val stdInterval = interval(100.milliseconds)
352-
val appId = eventually(timeout(20.seconds), stdInterval) {
352+
val stdInterval = interval(100 milliseconds)
353+
val appId = eventually(timeout(20 seconds), stdInterval) {
353354
val json = getContentAndCode("applications", port)._2.get
354355
val apps = parse(json).asInstanceOf[JArray].arr
355356
apps should have size 1
@@ -429,7 +430,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
429430
d2.count()
430431
dumpLogDir("After second job")
431432

432-
val stdTimeout = timeout(10.seconds)
433+
val stdTimeout = timeout(10 seconds)
433434
logDebug("waiting for UI to update")
434435
eventually(stdTimeout, stdInterval) {
435436
assert(2 === getNumJobs(""),

core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
2323
import scala.collection.JavaConverters._
2424
import scala.concurrent.duration._
2525
import scala.io.Source
26+
import scala.language.postfixOps
2627

2728
import org.json4s._
2829
import org.json4s.jackson.JsonMethods._
@@ -139,7 +140,7 @@ class MasterSuite extends SparkFunSuite
139140
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
140141
localCluster.start()
141142
try {
142-
eventually(timeout(5.seconds), interval(100.milliseconds)) {
143+
eventually(timeout(5 seconds), interval(100 milliseconds)) {
143144
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
144145
.getLines().mkString("\n")
145146
val JArray(workers) = (parse(json) \ "workers")

core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.launcher
2020
import java.util.concurrent.TimeUnit
2121

2222
import scala.concurrent.duration._
23+
import scala.language.postfixOps
2324

2425
import org.scalatest.Matchers
2526
import org.scalatest.concurrent.Eventually._
@@ -52,13 +53,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
5253
.startApplication()
5354

5455
try {
55-
eventually(timeout(30.seconds), interval(100.millis)) {
56+
eventually(timeout(30 seconds), interval(100 millis)) {
5657
handle.getAppId() should not be (null)
5758
}
5859

5960
handle.stop()
6061

61-
eventually(timeout(30.seconds), interval(100.millis)) {
62+
eventually(timeout(30 seconds), interval(100 millis)) {
6263
handle.getState() should be (SparkAppHandle.State.KILLED)
6364
}
6465
} finally {

0 commit comments

Comments
 (0)