Skip to content

Commit d027fbd

Browse files
committed
fix ordering
1 parent 0aa4680 commit d027fbd

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
165165
var segment: WriteAheadLogRecordHandle = null
166166
if (buffer.length > 0) {
167167
logDebug(s"Batched ${buffer.length} records for Write Ahead Log write")
168+
// threads may not be able to add items in order by time
169+
val sortedByTime = buffer.sortBy(_.time)
168170
// We take the latest record for the timestamp. Please refer to the class Javadoc for
169171
// detailed explanation
170-
val time = buffer.last.time
171-
segment = wrappedLog.write(aggregate(buffer), time)
172+
val time = sortedByTime.last.time
173+
segment = wrappedLog.write(aggregate(sortedByTime), time)
172174
}
173175
buffer.foreach(_.promise.success(segment))
174176
} catch {

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
480480
p
481481
}
482482

483-
test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") {
483+
test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") {
484484
val blockingWal = new BlockingWriteAheadLog(wal, walHandle)
485485
val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf)
486486

@@ -500,12 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
500500
// rest of the records will be batched while it takes time for 3 to get written
501501
writeAsync(batchedWal, event2, 5L)
502502
writeAsync(batchedWal, event3, 8L)
503-
writeAsync(batchedWal, event4, 12L)
503+
// we would like event 5 to be written before event 4 in order to test that they get
504+
// sorted before being aggregated
505+
writeAsync(batchedWal, event5, 12L)
504506
eventually(timeout(1 second)) {
505507
assert(blockingWal.isBlocked)
506508
assert(batchedWal.invokePrivate(queueLength()) === 3)
507509
}
508-
writeAsync(batchedWal, event5, 10L)
510+
writeAsync(batchedWal, event4, 10L)
509511
eventually(timeout(1 second)) {
510512
assert(walBatchingThreadPool.getActiveCount === 5)
511513
assert(batchedWal.invokePrivate(queueLength()) === 4)
@@ -521,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
521523
// the file name should be the timestamp of the last record, as events should be naturally
522524
// in order of timestamp, and we need the last element.
523525
val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
524-
verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
526+
verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L))
525527
val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
526528
assert(records.toSet === queuedEvents)
527529
}

0 commit comments

Comments
 (0)