Skip to content

Commit 277785a

Browse files
committed
Reduce the net change in this patch by reversing some unnecessary syntax changes along the way
1 parent b5e53df commit 277785a

File tree

7 files changed

+23
-23
lines changed

7 files changed

+23
-23
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private[spark] class ExecutorAllocationManager(
227227
* This is factored out into its own method for testing.
228228
*/
229229
private def schedule(): Unit = synchronized {
230-
val now = clock.getTimeMillis()
230+
val now = clock.getTimeMillis
231231

232232
addOrCancelExecutorRequests(now)
233233

@@ -410,7 +410,7 @@ private[spark] class ExecutorAllocationManager(
410410
if (addTime == NOT_SET) {
411411
logDebug(s"Starting timer to add executors because pending tasks " +
412412
s"are building up (to expire in $schedulerBacklogTimeout seconds)")
413-
addTime = clock.getTimeMillis() + schedulerBacklogTimeout * 1000
413+
addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
414414
}
415415
}
416416

@@ -434,7 +434,7 @@ private[spark] class ExecutorAllocationManager(
434434
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
435435
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
436436
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
437-
removeTimes(executorId) = clock.getTimeMillis() + executorIdleTimeout * 1000
437+
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
438438
}
439439
} else {
440440
logWarning(s"Attempted to mark unknown executor $executorId idle")

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
330330
assert(addTime(manager) === NOT_SET)
331331
onSchedulerBacklogged(manager)
332332
val firstAddTime = addTime(manager)
333-
assert(firstAddTime === clock.getTimeMillis() + schedulerBacklogTimeout * 1000)
333+
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
334334
clock.advance(100L)
335335
onSchedulerBacklogged(manager)
336336
assert(addTime(manager) === firstAddTime) // timer is already started
@@ -344,7 +344,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
344344
assert(addTime(manager) === NOT_SET)
345345
onSchedulerBacklogged(manager)
346346
val secondAddTime = addTime(manager)
347-
assert(secondAddTime === clock.getTimeMillis() + schedulerBacklogTimeout * 1000)
347+
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
348348
clock.advance(100L)
349349
onSchedulerBacklogged(manager)
350350
assert(addTime(manager) === secondAddTime) // timer is already started
@@ -366,7 +366,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
366366
assert(removeTimes(manager).size === 1)
367367
assert(removeTimes(manager).contains("1"))
368368
val firstRemoveTime = removeTimes(manager)("1")
369-
assert(firstRemoveTime === clock.getTimeMillis() + executorIdleTimeout * 1000)
369+
assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
370370
clock.advance(100L)
371371
onExecutorIdle(manager, "1")
372372
assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
@@ -376,11 +376,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
376376
clock.advance(300L)
377377
onExecutorIdle(manager, "2")
378378
assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
379-
assert(removeTimes(manager)("2") === clock.getTimeMillis() + executorIdleTimeout * 1000)
379+
assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
380380
clock.advance(400L)
381381
onExecutorIdle(manager, "3")
382382
assert(removeTimes(manager)("3") !== firstRemoveTime)
383-
assert(removeTimes(manager)("3") === clock.getTimeMillis() + executorIdleTimeout * 1000)
383+
assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
384384
assert(removeTimes(manager).size === 3)
385385
assert(removeTimes(manager).contains("2"))
386386
assert(removeTimes(manager).contains("3"))
@@ -393,7 +393,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
393393
assert(removeTimes(manager).size === 3)
394394
assert(removeTimes(manager).contains("1"))
395395
val secondRemoveTime = removeTimes(manager)("1")
396-
assert(secondRemoveTime === clock.getTimeMillis() + executorIdleTimeout * 1000)
396+
assert(secondRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
397397
assert(removeTimes(manager)("1") === secondRemoveTime) // timer is already started
398398
assert(removeTimes(manager)("1") !== firstRemoveTime)
399399
assert(firstRemoveTime !== secondRemoveTime)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
111111
conf: SparkConf,
112112
hadoopConf: Configuration,
113113
checkpointDir: String,
114-
clock: Clock = new SystemClock()
114+
clock: Clock = new SystemClock
115115
) extends ReceivedBlockHandler with Logging {
116116

117117
private val blockStoreTimeout = conf.getInt(

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[streaming] class WriteAheadLogManager(
5151
rollingIntervalSecs: Int = 60,
5252
maxFailures: Int = 3,
5353
callerName: String = "",
54-
clock: Clock = new SystemClock()
54+
clock: Clock = new SystemClock
5555
) extends Logging {
5656

5757
private val pastLogs = new ArrayBuffer[LogInfo]

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
5353
val mapOutputTracker = new MapOutputTrackerMaster(conf)
5454
val shuffleManager = new HashShuffleManager(conf)
5555
val serializer = new KryoSerializer(conf)
56-
val manualClock = new ManualClock()
56+
val manualClock = new ManualClock
5757
val blockManagerSize = 10000000
5858

5959
var actorSystem: ActorSystem = null

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class ReceivedBlockTrackerSuite
9494
}
9595

9696
test("block addition, block to batch allocation and cleanup with write ahead log") {
97-
val manualClock = new ManualClock()
97+
val manualClock = new ManualClock
9898
// Set the time increment level to twice the rotation interval so that every increment creates
9999
// a new log file
100100

@@ -211,7 +211,7 @@ class ReceivedBlockTrackerSuite
211211
*/
212212
def createTracker(
213213
setCheckpointDir: Boolean = true,
214-
clock: Clock = new SystemClock()): ReceivedBlockTracker = {
214+
clock: Clock = new SystemClock): ReceivedBlockTracker = {
215215
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
216216
val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
217217
allReceivedBlockTrackers += tracker

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
191191

192192
private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
193193
// Write data with manager, recover with new manager and verify
194-
val manualClock = new ManualClock()
194+
val manualClock = new ManualClock
195195
val dataToWrite = generateRandomData()
196196
manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
197197
val logFiles = getLogFilesInDirectory(testDir)
@@ -210,17 +210,17 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
210210

211211
test("WriteAheadLogManager - handling file errors while reading rotating logs") {
212212
// Generate a set of log files
213-
val fakeClock = new ManualClock()
213+
val manualClock = new ManualClock
214214
val dataToWrite1 = generateRandomData()
215-
writeDataUsingManager(testDir, dataToWrite1, fakeClock)
215+
writeDataUsingManager(testDir, dataToWrite1, manualClock)
216216
val logFiles1 = getLogFilesInDirectory(testDir)
217217
assert(logFiles1.size > 1)
218218

219219

220220
// Recover old files and generate a second set of log files
221221
val dataToWrite2 = generateRandomData()
222-
fakeClock.advance(100000)
223-
writeDataUsingManager(testDir, dataToWrite2, fakeClock)
222+
manualClock.advance(100000)
223+
writeDataUsingManager(testDir, dataToWrite2, manualClock)
224224
val logFiles2 = getLogFilesInDirectory(testDir)
225225
assert(logFiles2.size > logFiles1.size)
226226

@@ -276,15 +276,15 @@ object WriteAheadLogSuite {
276276
def writeDataUsingManager(
277277
logDirectory: String,
278278
data: Seq[String],
279-
fakeClock: ManualClock = new ManualClock(),
279+
manualClock: ManualClock = new ManualClock,
280280
stopManager: Boolean = true
281281
): WriteAheadLogManager = {
282-
if (fakeClock.getTimeMillis() < 100000) fakeClock.setTime(10000)
282+
if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
283283
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
284-
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
284+
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
285285
// Ensure that 500 does not get sorted after 2000, so put a high base value.
286286
data.foreach { item =>
287-
fakeClock.advance(500)
287+
manualClock.advance(500)
288288
manager.writeToLog(item)
289289
}
290290
if (stopManager) manager.stop()

0 commit comments

Comments
 (0)