Skip to content

Commit c3c177d

Browse files
committed
Address comment
1 parent 66f04d8 commit c3c177d

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
182182
buffer.clear()
183183
}
184184
}
185+
186+
/** Method for querying the queue length. Should only be used in tests. */
187+
private def getQueueLength(): Int = walWriteQueue.size()
185188
}
186189

187190
/** Static methods for aggregating and de-aggregating records. */

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.mockito.invocation.InvocationOnMock
3636
import org.mockito.stubbing.Answer
3737
import org.scalatest.concurrent.Eventually
3838
import org.scalatest.concurrent.Eventually._
39-
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfter}
39+
import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach, BeforeAndAfter}
4040
import org.scalatest.mock.MockitoSugar
4141

4242
import org.apache.spark.streaming.scheduler._
@@ -314,7 +314,11 @@ class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
314314
class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
315315
allowBatching = true,
316316
closeFileAfterWrite = false,
317-
"BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with Eventually {
317+
"BatchedWriteAheadLog")
318+
with MockitoSugar
319+
with BeforeAndAfterEach
320+
with Eventually
321+
with PrivateMethodTester {
318322

319323
import BatchedWriteAheadLog._
320324
import WriteAheadLogSuite._
@@ -325,6 +329,8 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
325329
private var walBatchingExecutionContext: ExecutionContextExecutorService = _
326330
private val sparkConf = new SparkConf()
327331

332+
private val queueLength = PrivateMethod[Int]('getQueueLength)
333+
328334
override def beforeEach(): Unit = {
329335
wal = mock[WriteAheadLog]
330336
walHandle = mock[WriteAheadLogRecordHandle]
@@ -390,20 +396,22 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
390396
eventually(timeout(1 second)) {
391397
assert(blockingWal.isBlocked)
392398
}
393-
// rest of the records will be batched while it takes 3 to get written
399+
// rest of the records will be batched while it takes time for 3 to get written
394400
writeAsync(batchedWal, event2, 5L)
395401
writeAsync(batchedWal, event3, 8L)
396402
writeAsync(batchedWal, event4, 12L)
397403
writeAsync(batchedWal, event5, 10L)
398404
eventually(timeout(1 second)) {
399405
assert(walBatchingThreadPool.getActiveCount === 5)
406+
assert(batchedWal.invokePrivate(queueLength()) === 4)
400407
}
401408
blockingWal.allowWrite()
402409

403410
val buffer1 = wrapArrayArrayByte(Array(event1))
404411
val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
405412

406413
eventually(timeout(1 second)) {
414+
assert(batchedWal.invokePrivate(queueLength()) === 0)
407415
verify(wal, times(1)).write(meq(buffer1), meq(3L))
408416
// the file name should be the timestamp of the last record, as events should be naturally
409417
// in order of timestamp, and we need the last element.

0 commit comments

Comments
 (0)