From 732098cef6c19476134cfcfc14a077ed733a336f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 1 Dec 2015 21:36:35 -0800 Subject: [PATCH 1/3] fix flaky test --- .../streaming/util/WriteAheadLogSuite.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index eaa88ea3cd38..ff4a5941383d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -499,8 +499,20 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( } // rest of the records will be batched while it takes time for 3 to get written writeAsync(batchedWal, event2, 5L) + eventually(timeout(1 second)) { + assert(blockingWal.isBlocked) + assert(batchedWal.invokePrivate(queueLength()) === 1) + } writeAsync(batchedWal, event3, 8L) + eventually(timeout(1 second)) { + assert(blockingWal.isBlocked) + assert(batchedWal.invokePrivate(queueLength()) === 2) + } writeAsync(batchedWal, event4, 12L) + eventually(timeout(1 second)) { + assert(blockingWal.isBlocked) + assert(batchedWal.invokePrivate(queueLength()) === 3) + } writeAsync(batchedWal, event5, 10L) eventually(timeout(1 second)) { assert(walBatchingThreadPool.getActiveCount === 5) @@ -509,7 +521,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( blockingWal.allowWrite() val buffer = wrapArrayArrayByte(Array(event1)) - val queuedEvents = Set(event2, event3, event4, event5) + val queuedEvents = Seq(event2, event3, event4, event5) eventually(timeout(1 second)) { assert(batchedWal.invokePrivate(queueLength()) === 0) @@ -519,7 +531,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer]) verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L)) val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString) - assert(records.toSet === queuedEvents) + assert(records === queuedEvents) } } From 0aa468007598fbdc77ba90c37b81a807a4e83d67 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 2 Dec 2015 13:21:34 -0800 Subject: [PATCH 2/3] remove unnecessary --- .../spark/streaming/util/WriteAheadLogSuite.scala | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index ff4a5941383d..1fe999d86357 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -499,15 +499,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( } // rest of the records will be batched while it takes time for 3 to get written writeAsync(batchedWal, event2, 5L) - eventually(timeout(1 second)) { - assert(blockingWal.isBlocked) - assert(batchedWal.invokePrivate(queueLength()) === 1) - } writeAsync(batchedWal, event3, 8L) - eventually(timeout(1 second)) { - assert(blockingWal.isBlocked) - assert(batchedWal.invokePrivate(queueLength()) === 2) - } writeAsync(batchedWal, event4, 12L) eventually(timeout(1 second)) { assert(blockingWal.isBlocked) @@ -521,7 +513,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( blockingWal.allowWrite() val buffer = wrapArrayArrayByte(Array(event1)) - val queuedEvents = Seq(event2, event3, event4, event5) + val queuedEvents = Set(event2, event3, event4, event5) eventually(timeout(1 second)) { assert(batchedWal.invokePrivate(queueLength()) === 0) @@ -531,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer]) verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L)) val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString) - assert(records === queuedEvents) + assert(records.toSet === queuedEvents) } } From d027fbda6b09b39febf9564e31ccf5890f174bb2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 2 Dec 2015 17:32:33 -0800 Subject: [PATCH 3/3] fix ordering --- .../spark/streaming/util/BatchedWriteAheadLog.scala | 6 ++++-- .../spark/streaming/util/WriteAheadLogSuite.scala | 10 ++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 6e6ed8d81972..862272bb4498 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -165,10 +165,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp var segment: WriteAheadLogRecordHandle = null if (buffer.length > 0) { logDebug(s"Batched ${buffer.length} records for Write Ahead Log write") + // threads may not be able to add items in order by time + val sortedByTime = buffer.sortBy(_.time) // We take the latest record for the timestamp. Please refer to the class Javadoc for // detailed explanation - val time = buffer.last.time - segment = wrappedLog.write(aggregate(buffer), time) + val time = sortedByTime.last.time + segment = wrappedLog.write(aggregate(sortedByTime), time) } buffer.foreach(_.promise.success(segment)) } catch { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 1fe999d86357..ef1e89df3130 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( p } - test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") { + test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") { val blockingWal = new BlockingWriteAheadLog(wal, walHandle) val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf) @@ -500,12 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // rest of the records will be batched while it takes time for 3 to get written writeAsync(batchedWal, event2, 5L) writeAsync(batchedWal, event3, 8L) - writeAsync(batchedWal, event4, 12L) + // we would like event 5 to be written before event 4 in order to test that they get + // sorted before being aggregated + writeAsync(batchedWal, event5, 12L) eventually(timeout(1 second)) { assert(blockingWal.isBlocked) assert(batchedWal.invokePrivate(queueLength()) === 3) } - writeAsync(batchedWal, event5, 10L) + writeAsync(batchedWal, event4, 10L) eventually(timeout(1 second)) { assert(walBatchingThreadPool.getActiveCount === 5) assert(batchedWal.invokePrivate(queueLength()) === 4) @@ -521,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // the file name should be the timestamp of the last record, as events should be naturally // in order of timestamp, and we need the last element. val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer]) - verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L)) + verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L)) val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString) assert(records.toSet === queuedEvents) }