Skip to content

Commit 3939432

Browse files
committed
Rename StreamingTestWaiter to BatchCounter
1 parent 0b9c3a1 commit 3939432

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ class CheckpointSuite extends TestSuiteBase {
343343
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
344344
ssc.checkpoint(checkpointDir)
345345
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
346-
val waiter = new StreamingTestWaiter(ssc)
346+
val batchCounter = new BatchCounter(ssc)
347347
val fileStream = ssc.textFileStream(testDir.toString)
348348
// Make value 3 take a large time to process, to ensure that the driver
349349
// shuts down in the middle of processing the 3rd batch
@@ -374,14 +374,14 @@ class CheckpointSuite extends TestSuiteBase {
374374
if (i != 3) {
375375
// Since we want to shut down while the 3rd batch is processing
376376
eventually(timeout(batchDuration * 5)) {
377-
assert(waiter.getNumCompletedBatches === i)
377+
assert(batchCounter.getNumCompletedBatches === i)
378378
}
379379
}
380380
}
381381
clock.addToTime(batchDuration.milliseconds)
382382
eventually(timeout(batchDuration * 5)) {
383383
// Wait until all files have been recorded and all batches have started
384-
assert(recordedFiles(ssc) === Seq(1, 2, 3) && waiter.getNumStartedBatches === 3)
384+
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
385385
}
386386
// Wait for a checkpoint to be written
387387
val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
@@ -390,7 +390,7 @@ class CheckpointSuite extends TestSuiteBase {
390390
}
391391
ssc.stop()
392392
// Check that we shut down while the third batch was being processed
393-
assert(waiter.getNumCompletedBatches === 2)
393+
assert(batchCounter.getNumCompletedBatches === 2)
394394
assert(outputStream.output.flatten === Seq(1, 3))
395395
}
396396

@@ -413,7 +413,7 @@ class CheckpointSuite extends TestSuiteBase {
413413
newClock.setTime(clock.currentTime())
414414
newClock
415415
}
416-
val waiter = new StreamingTestWaiter(ssc)
416+
val batchCounter = new BatchCounter(ssc)
417417
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
418418
// Check that we remember files that were recorded before the restart
419419
assert(recordedFiles(ssc) === Seq(1, 2, 3))
@@ -425,7 +425,7 @@ class CheckpointSuite extends TestSuiteBase {
425425
writeFile(i, clock)
426426
clock.addToTime(batchDuration.milliseconds)
427427
eventually(timeout(batchDuration * 5)) {
428-
assert(waiter.getNumCompletedBatches === index + 1)
428+
assert(batchCounter.getNumCompletedBatches === index + 1)
429429
}
430430
}
431431
clock.addToTime(batchDuration.milliseconds)

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
246246
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
247247
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
248248
clock.setTime(existingFile.lastModified + 1000)
249-
val waiter = new StreamingTestWaiter(ssc)
249+
val batchCounter = new BatchCounter(ssc)
250250
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
251251
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
252252
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -264,7 +264,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
264264
assert(file.lastModified === clock.currentTime)
265265
logInfo("Created file " + file)
266266
eventually(timeout(batchDuration * 5)) {
267-
assert(waiter.getNumCompletedBatches === i)
267+
assert(batchCounter.getNumCompletedBatches === i)
268268
}
269269
}
270270

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,27 +107,26 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
107107
}
108108

109109
/**
110-
* An object that can be used to block until certain events occur, such as batch start/completion.
111-
* This is much less brittle than waiting on wall-clock time. Internally, this is implemented using
112-
* a StreamingListener. Constructing a new instance automatically registers a StreamingListener on
110+
* An object that counts the number of started / completed batches. This is implemented using a
111+
* StreamingListener. Constructing a new instance automatically registers a StreamingListener on
113112
* the given StreamingContext.
114113
*/
115-
class StreamingTestWaiter(ssc: StreamingContext) {
114+
class BatchCounter(ssc: StreamingContext) {
116115

117-
// All access to this state should be guarded by `StreamingTestWaiter.this.synchronized`
116+
// All access to this state should be guarded by `BatchCounter.this.synchronized`
118117
private var numCompletedBatches = 0
119118
private var numStartedBatches = 0
120119

121120
private val listener = new StreamingListener {
122121
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
123-
StreamingTestWaiter.this.synchronized {
122+
BatchCounter.this.synchronized {
124123
numStartedBatches += 1
125-
StreamingTestWaiter.this.notifyAll()
124+
BatchCounter.this.notifyAll()
126125
}
127126
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
128-
StreamingTestWaiter.this.synchronized {
127+
BatchCounter.this.synchronized {
129128
numCompletedBatches += 1
130-
StreamingTestWaiter.this.notifyAll()
129+
BatchCounter.this.notifyAll()
131130
}
132131
}
133132
ssc.addStreamingListener(listener)

0 commit comments

Comments
 (0)