Skip to content

Commit 924c424

Browse files
brkyvztdas
authored andcommitted
[SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests
## What changes were proposed in this pull request? Some Structured Streaming tests show flakiness such as: ``` [info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds) [info] Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds. ``` This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`. ## How was this patch tested? Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests Author: Burak Yavuz <[email protected]> Closes #17613 from brkyvz/flaky-stream-agg.
1 parent 99a9473 commit 924c424

File tree

2 files changed

+32
-30
lines changed

2 files changed

+32
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -284,42 +284,38 @@ class StreamExecution(
284284
triggerExecutor.execute(() => {
285285
startTrigger()
286286

287-
val continueToRun =
288-
if (isActive) {
289-
reportTimeTaken("triggerExecution") {
290-
if (currentBatchId < 0) {
291-
// We'll do this initialization only once
292-
populateStartOffsets(sparkSessionToRunBatches)
293-
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
294-
} else {
295-
constructNextBatch()
296-
}
297-
if (dataAvailable) {
298-
currentStatus = currentStatus.copy(isDataAvailable = true)
299-
updateStatusMessage("Processing new data")
300-
runBatch(sparkSessionToRunBatches)
301-
}
287+
if (isActive) {
288+
reportTimeTaken("triggerExecution") {
289+
if (currentBatchId < 0) {
290+
// We'll do this initialization only once
291+
populateStartOffsets(sparkSessionToRunBatches)
292+
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
293+
} else {
294+
constructNextBatch()
302295
}
303-
// Report trigger as finished and construct progress object.
304-
finishTrigger(dataAvailable)
305296
if (dataAvailable) {
306-
// Update committed offsets.
307-
batchCommitLog.add(currentBatchId)
308-
committedOffsets ++= availableOffsets
309-
logDebug(s"batch ${currentBatchId} committed")
310-
// We'll increase currentBatchId after we complete processing current batch's data
311-
currentBatchId += 1
312-
} else {
313-
currentStatus = currentStatus.copy(isDataAvailable = false)
314-
updateStatusMessage("Waiting for data to arrive")
315-
Thread.sleep(pollingDelayMs)
297+
currentStatus = currentStatus.copy(isDataAvailable = true)
298+
updateStatusMessage("Processing new data")
299+
runBatch(sparkSessionToRunBatches)
316300
}
317-
true
301+
}
302+
// Report trigger as finished and construct progress object.
303+
finishTrigger(dataAvailable)
304+
if (dataAvailable) {
305+
// Update committed offsets.
306+
batchCommitLog.add(currentBatchId)
307+
committedOffsets ++= availableOffsets
308+
logDebug(s"batch ${currentBatchId} committed")
309+
// We'll increase currentBatchId after we complete processing current batch's data
310+
currentBatchId += 1
318311
} else {
319-
false
312+
currentStatus = currentStatus.copy(isDataAvailable = false)
313+
updateStatusMessage("Waiting for data to arrive")
314+
Thread.sleep(pollingDelayMs)
320315
}
316+
}
321317
updateStatusMessage("Waiting for next trigger")
322-
continueToRun
318+
isActive
323319
})
324320
updateStatusMessage("Stopped")
325321
} else {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
277277

278278
def threadState =
279279
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
280+
def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
281+
s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
282+
} else {
283+
""
284+
}
280285

281286
def testState =
282287
s"""
@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
287292
|Output Mode: $outputMode
288293
|Stream state: $currentOffsets
289294
|Thread state: $threadState
295+
|$threadStackTrace
290296
|${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
291297
|
292298
|== Sink ==

0 commit comments

Comments
 (0)