Skip to content

Commit 54d3a2c

Browse files
committed
stop the query rather than retry looping when task tries to retry
1 parent ad2f206 commit 54d3a2c

File tree

2 files changed

+10
-20
lines changed

2 files changed

+10
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,7 @@ class ContinuousExecution(
8989
}
9090

9191
do {
92-
try {
93-
runContinuous(sparkSessionForStream)
94-
} catch {
95-
// Capture task retries (transformed to an exception by ContinuousDataSourceRDDIter) and
96-
// convert them to global retries by letting the while loop spin.
97-
case s: SparkException
98-
if s.getCause != null && s.getCause.getCause != null &&
99-
s.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] => ()
100-
}
92+
runContinuous(sparkSessionForStream)
10193
} while (state.updateAndGet(stateUpdate) == ACTIVE)
10294
}
10395

sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
201201
StopStream)
202202
}
203203

204-
test("task failure triggers a ContinuousExecution restart") {
204+
test("task failure kills the query") {
205205
val df = spark.readStream
206206
.format("rate")
207207
.option("numPartitions", "5")
@@ -218,23 +218,21 @@ class ContinuousSuite extends ContinuousSuiteBase {
218218
}
219219
spark.sparkContext.addSparkListener(listener)
220220

221-
222-
var originalRunId: UUID = null
223221
testStream(df, useV2Sink = true)(
224222
StartStream(Trigger.Continuous(100)),
225223
Execute(waitForRateSourceTriggers(_, 2)),
226224
Execute { query =>
227225
// Wait until a task is started, then kill its first attempt.
228226
eventually(timeout(streamingTimeout)) { assert(taskId != -1) }
229-
originalRunId = query.runId
230227
spark.sparkContext.killTaskAttempt(taskId)
231-
},
232-
Execute(waitForRateSourceTriggers(_, 4)),
233-
IncrementEpoch(),
234-
// Rather than just restarting the task we killed, there should have been a
235-
// ContinuousExecution restart changing the run ID.
236-
AssertOnQuery(_.runId != originalRunId),
237-
CheckAnswerRowsContains(scala.Range(0, 20).map(Row(_))))
228+
eventually(timeout(streamingTimeout)) {
229+
assert(query.exception.isDefined)
230+
}
231+
assert(
232+
query.exception.get.getCause != null &&
233+
query.exception.get.getCause.getCause != null &&
234+
query.exception.get.getCause.getCause.getCause.isInstanceOf[ContinuousTaskRetryException])
235+
})
238236

239237
spark.sparkContext.removeSparkListener(listener)
240238
}

0 commit comments

Comments
 (0)