Skip to content

Commit 6fd9e70

Browse files
brkyvztdas
authored andcommitted
[SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high
We need to make sure that the last entry is indeed the last entry in the queue. Author: Burak Yavuz <[email protected]> Closes #10110 from brkyvz/batch-wal-test-fix.
1 parent 80a824d commit 6fd9e70

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
166166
var segment: WriteAheadLogRecordHandle = null
167167
if (buffer.length > 0) {
168168
logDebug(s"Batched ${buffer.length} records for Write Ahead Log write")
169+
// threads may not be able to add items in order by time
170+
val sortedByTime = buffer.sortBy(_.time)
169171
// We take the latest record for the timestamp. Please refer to the class Javadoc for
170172
// detailed explanation
171-
val time = buffer.last.time
172-
segment = wrappedLog.write(aggregate(buffer), time)
173+
val time = sortedByTime.last.time
174+
segment = wrappedLog.write(aggregate(sortedByTime), time)
173175
}
174176
buffer.foreach(_.promise.success(segment))
175177
} catch {

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
480480
p
481481
}
482482

483-
test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") {
483+
test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") {
484484
val blockingWal = new BlockingWriteAheadLog(wal, walHandle)
485485
val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf)
486486

@@ -500,8 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
500500
// rest of the records will be batched while it takes time for 3 to get written
501501
writeAsync(batchedWal, event2, 5L)
502502
writeAsync(batchedWal, event3, 8L)
503-
writeAsync(batchedWal, event4, 12L)
504-
writeAsync(batchedWal, event5, 10L)
503+
// we would like event 5 to be written before event 4 in order to test that they get
504+
// sorted before being aggregated
505+
writeAsync(batchedWal, event5, 12L)
506+
eventually(timeout(1 second)) {
507+
assert(blockingWal.isBlocked)
508+
assert(batchedWal.invokePrivate(queueLength()) === 3)
509+
}
510+
writeAsync(batchedWal, event4, 10L)
505511
eventually(timeout(1 second)) {
506512
assert(walBatchingThreadPool.getActiveCount === 5)
507513
assert(batchedWal.invokePrivate(queueLength()) === 4)
@@ -517,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
517523
// the file name should be the timestamp of the last record, as events should be naturally
518524
// in order of timestamp, and we need the last element.
519525
val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
520-
verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
526+
verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L))
521527
val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
522528
assert(records.toSet === queuedEvents)
523529
}

0 commit comments

Comments
 (0)