Skip to content

Commit 0aa4680

Browse files
committed
remove unnecessary
1 parent 732098c commit 0aa4680

File tree

1 file changed

+2
-10
lines changed

1 file changed

+2
-10
lines changed

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -499,15 +499,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
499499
}
500500
// rest of the records will be batched while it takes time for 3 to get written
501501
writeAsync(batchedWal, event2, 5L)
502-
eventually(timeout(1 second)) {
503-
assert(blockingWal.isBlocked)
504-
assert(batchedWal.invokePrivate(queueLength()) === 1)
505-
}
506502
writeAsync(batchedWal, event3, 8L)
507-
eventually(timeout(1 second)) {
508-
assert(blockingWal.isBlocked)
509-
assert(batchedWal.invokePrivate(queueLength()) === 2)
510-
}
511503
writeAsync(batchedWal, event4, 12L)
512504
eventually(timeout(1 second)) {
513505
assert(blockingWal.isBlocked)
@@ -521,7 +513,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
521513
blockingWal.allowWrite()
522514

523515
val buffer = wrapArrayArrayByte(Array(event1))
524-
val queuedEvents = Seq(event2, event3, event4, event5)
516+
val queuedEvents = Set(event2, event3, event4, event5)
525517

526518
eventually(timeout(1 second)) {
527519
assert(batchedWal.invokePrivate(queueLength()) === 0)
@@ -531,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
531523
val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
532524
verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
533525
val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
534-
assert(records === queuedEvents)
526+
assert(records.toSet === queuedEvents)
535527
}
536528
}
537529

0 commit comments

Comments
 (0)