Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)) {
fail("The first batch cannot complete in 10 seconds")
}
// When reaching here, we can make sure `StreamingContextStoppingCollector` won't call
// `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
_ssc.stop()
assert(contextStoppingCollector.sparkExSeen)
}
Expand Down Expand Up @@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
@volatile var sparkExSeen = false

private var isFirstBatch = true

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
try {
ssc.stop()
} catch {
case se: SparkException =>
sparkExSeen = true
if (isFirstBatch) {
// We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main
// thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling
// `ssc.stop()` in the listener thread, which becomes a dead-lock.
isFirstBatch = false
try {
ssc.stop()
} catch {
case se: SparkException =>
sparkExSeen = true
}
}
}
}