From 22e02b9ab45e453616072c69bea856874708618f Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 9 Feb 2017 18:32:16 -0800 Subject: [PATCH 1/3] Delete the temp checkpoint if a query is stopped without errors --- .../execution/streaming/StreamExecution.scala | 23 ++++++++++++++-- .../sql/streaming/StreamingQueryManager.scala | 6 ++++- .../test/DataStreamReaderWriterSuite.scala | 26 +++++++++++++++++++ 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ea3719421b8a..c3c98adf0c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.IOException import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.locks.ReentrantLock @@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created * and the results are committed transactionally to the given [[Sink]]. + * + * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without + * errors */ class StreamExecution( override val sparkSession: SparkSession, override val name: String, - checkpointRoot: String, + val checkpointRoot: String, analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, - val outputMode: OutputMode) + val outputMode: OutputMode, + deleteCheckpointOnStop: Boolean) extends StreamingQuery with ProgressReporter with Logging { import org.apache.spark.sql.streaming.StreamingQueryListener._ @@ -323,6 +328,20 @@ class StreamExecution( sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) + + // Delete the temp checkpoint only when the query didn't fail + if (deleteCheckpointOnStop && exception.isEmpty) { + val checkpointPath = new Path(checkpointRoot) + try { + val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + fs.delete(checkpointPath, true) + } catch { + case e: IOException => + // Deleting temp checkpoint folder is best effort, don't throw IOException when we + // cannot delete them. + logWarning(s"Cannot delete $checkpointPath", e) + } + } } finally { terminationLatch.countDown() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 0b9406b027f5..38edb40dfb78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -195,6 +195,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { recoverFromCheckpointLocation: Boolean, trigger: Trigger, triggerClock: Clock): StreamingQueryWrapper = { + var deleteCheckpointOnStop = false val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => new Path(userSpecified).toUri.toString }.orElse { @@ -203,6 +204,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } }.getOrElse { if (useTempCheckpointLocation) { + // Delete the temp checkpoint when a query is being stopped without errors. + deleteCheckpointOnStop = true Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath } else { throw new AnalysisException( @@ -244,7 +247,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { sink, trigger, triggerClock, - outputMode)) + outputMode, + deleteCheckpointOnStop)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 30a957ef8128..0470411a0f10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -670,4 +670,30 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with Pr } } } + + test("temp checkpoint dir should be deleted if a query is stopped without errors") { + import testImplicits._ + val query = MemoryStream[Int].toDS.writeStream.format("console").start() + val checkpointDir = new Path( + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) + val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) + assert(fs.exists(checkpointDir)) + query.stop() + assert(!fs.exists(checkpointDir)) + } + + testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") { + import testImplicits._ + val input = MemoryStream[Int] + val query = input.toDS.map(_ / 0).writeStream.format("console").start() + val checkpointDir = new Path( + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) + val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) + assert(fs.exists(checkpointDir)) + input.addData(1) + intercept[StreamingQueryException] { + query.awaitTermination() + } + assert(fs.exists(checkpointDir)) + } } From e5fa7d76669a9a61326e032d48dec6df0d91d9c5 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 9 Feb 2017 20:09:33 -0800 Subject: [PATCH 2/3] Add checkpoint location log --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c3c98adf0c35..3fb2ec02b124 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -218,6 +218,7 @@ class StreamExecution( * has been posted to all the listeners. */ def start(): Unit = { + logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.") microBatchThread.setDaemon(true) microBatchThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted From cae981f1ab9ee6f3d0d3cd4b53ccf6431551a0c0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 10 Feb 2017 14:22:54 -0800 Subject: [PATCH 3/3] Use NonFatal --- .../spark/sql/execution/streaming/StreamExecution.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3fb2ec02b124..3149ef04f7d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -337,9 +337,9 @@ class StreamExecution( val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) fs.delete(checkpointPath, true) } catch { - case e: IOException => - // Deleting temp checkpoint folder is best effort, don't throw IOException when we - // cannot delete them. + case NonFatal(e) => + // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions + // when we cannot delete them. logWarning(s"Cannot delete $checkpointPath", e) } }