Skip to content

Commit 357eeb0

Browse files
zsxwingcmonkey
authored andcommitted
[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors
## What changes were proposed in this pull request? When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors. ## How was this patch tested? New unit tests. Author: Shixiong Zhu <[email protected]> Closes apache#16880 from zsxwing/delete-temp-checkpoint.
1 parent 084f561 commit 357eeb0

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.io.IOException
2021
import java.util.UUID
2122
import java.util.concurrent.{CountDownLatch, TimeUnit}
2223
import java.util.concurrent.locks.ReentrantLock
@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
4142
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
4243
* [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
4344
* and the results are committed transactionally to the given [[Sink]].
45+
*
46+
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
47+
* errors
4448
*/
4549
class StreamExecution(
4650
override val sparkSession: SparkSession,
4751
override val name: String,
48-
checkpointRoot: String,
52+
val checkpointRoot: String,
4953
analyzedPlan: LogicalPlan,
5054
val sink: Sink,
5155
val trigger: Trigger,
5256
val triggerClock: Clock,
53-
val outputMode: OutputMode)
57+
val outputMode: OutputMode,
58+
deleteCheckpointOnStop: Boolean)
5459
extends StreamingQuery with ProgressReporter with Logging {
5560

5661
import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -213,6 +218,7 @@ class StreamExecution(
213218
* has been posted to all the listeners.
214219
*/
215220
def start(): Unit = {
221+
logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.")
216222
microBatchThread.setDaemon(true)
217223
microBatchThread.start()
218224
startLatch.await() // Wait until thread started and QueryStart event has been posted
@@ -323,6 +329,20 @@ class StreamExecution(
323329
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
324330
postEvent(
325331
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
332+
333+
// Delete the temp checkpoint only when the query didn't fail
334+
if (deleteCheckpointOnStop && exception.isEmpty) {
335+
val checkpointPath = new Path(checkpointRoot)
336+
try {
337+
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
338+
fs.delete(checkpointPath, true)
339+
} catch {
340+
case NonFatal(e) =>
341+
// Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions
342+
// when we cannot delete them.
343+
logWarning(s"Cannot delete $checkpointPath", e)
344+
}
345+
}
326346
} finally {
327347
terminationLatch.countDown()
328348
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
195195
recoverFromCheckpointLocation: Boolean,
196196
trigger: Trigger,
197197
triggerClock: Clock): StreamingQueryWrapper = {
198+
var deleteCheckpointOnStop = false
198199
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
199200
new Path(userSpecified).toUri.toString
200201
}.orElse {
@@ -203,6 +204,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
203204
}
204205
}.getOrElse {
205206
if (useTempCheckpointLocation) {
207+
// Delete the temp checkpoint when a query is being stopped without errors.
208+
deleteCheckpointOnStop = true
206209
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
207210
} else {
208211
throw new AnalysisException(
@@ -244,7 +247,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
244247
sink,
245248
trigger,
246249
triggerClock,
247-
outputMode))
250+
outputMode,
251+
deleteCheckpointOnStop))
248252
}
249253

250254
/**

sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,4 +670,30 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with Pr
670670
}
671671
}
672672
}
673+
674+
test("temp checkpoint dir should be deleted if a query is stopped without errors") {
675+
import testImplicits._
676+
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
677+
val checkpointDir = new Path(
678+
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
679+
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
680+
assert(fs.exists(checkpointDir))
681+
query.stop()
682+
assert(!fs.exists(checkpointDir))
683+
}
684+
685+
testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
686+
import testImplicits._
687+
val input = MemoryStream[Int]
688+
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
689+
val checkpointDir = new Path(
690+
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
691+
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
692+
assert(fs.exists(checkpointDir))
693+
input.addData(1)
694+
intercept[StreamingQueryException] {
695+
query.awaitTermination()
696+
}
697+
assert(fs.exists(checkpointDir))
698+
}
673699
}

0 commit comments

Comments
 (0)