From 2b1bcdc37aa1b10084c2ac6abd5934756f37d080 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 1 Apr 2019 11:14:51 +0200 Subject: [PATCH 1/2] [MINOR][DSTREAMS] Add DStreamCheckpointData.cleanup warning if delete returns false --- .../spark/streaming/dstream/DStreamCheckpointData.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index ebfaa83c704b..dbbacd510d18 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -87,7 +87,9 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) if (fileSystem == null) { fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } - fileSystem.delete(path, true) + if (!fileSystem.delete(path, true)) { + logWarning(s"Error deleting old checkpoint file '$file' for time $time") + } timeToCheckpointFile -= time logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { From 859e6f93ab685753cf05c2963ecec44f9cbaf6ef Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 2 Apr 2019 09:10:00 +0200 Subject: [PATCH 2/2] Either success or failed message --- .../spark/streaming/dstream/DStreamCheckpointData.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index dbbacd510d18..b35f7d97233e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -87,11 +87,12 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) if (fileSystem == null) { fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } - if (!fileSystem.delete(path, true)) { + if (fileSystem.delete(path, true)) { + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } else { logWarning(s"Error deleting old checkpoint file '$file' for time $time") } timeToCheckpointFile -= time - logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { case e: Exception => logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)