From 4bfdac025ff0906316cf7697933a7b374ae3b427 Mon Sep 17 00:00:00 2001 From: Patrick Brown Date: Mon, 29 Oct 2018 13:49:50 -0600 Subject: [PATCH 1/3] Update cleanupStages in AppStatusListener to delete tasks for all stages in a single pass --- .../spark/status/AppStatusListener.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index d52b7e8dae71e..741526d2721ff 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1073,16 +1073,6 @@ private[spark] class AppStatusListener( kvstore.delete(e.getClass(), e.id) } - val tasks = kvstore.view(classOf[TaskDataWrapper]) - .index("stage") - .first(key) - .last(key) - .asScala - - tasks.foreach { t => - kvstore.delete(t.getClass(), t.taskId) - } - // Check whether there are remaining attempts for the same stage. If there aren't, then // also delete the RDD graph data. val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) @@ -1105,6 +1095,15 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) } + + // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow + val tasks: Iterable[TaskDataWrapper] = kvstore.view(classOf[TaskDataWrapper]).asScala + val keys = stages.map(s => (s.info.stageId, s.info.attemptId)).toSet + tasks.foreach { t => + if (keys.contains((t.stageId, t.stageAttemptId))) { + kvstore.delete(t.getClass(), t.taskId) + } + } } private def cleanupTasks(stage: LiveStage): Unit = { From 178f7c3bf82f93177fce086037ece6ebf09bb350 Mon Sep 17 00:00:00 2001 From: Patrick Brown Date: Mon, 29 Oct 2018 13:55:38 -0600 Subject: [PATCH 2/3] remove uneeded type --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 741526d2721ff..186366c384916 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1097,7 +1097,7 @@ private[spark] class AppStatusListener( } // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow - val tasks: Iterable[TaskDataWrapper] = kvstore.view(classOf[TaskDataWrapper]).asScala + val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala val keys = stages.map(s => (s.info.stageId, s.info.attemptId)).toSet tasks.foreach { t => if (keys.contains((t.stageId, t.stageAttemptId))) { From 82fef8686f4b71d94b3bfe44ea809b4d88f5fe70 Mon Sep 17 00:00:00 2001 From: Patrick Brown Date: Wed, 31 Oct 2018 12:18:53 -0600 Subject: [PATCH 3/3] fix style issue --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 186366c384916..e2c190ea198e0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1098,7 +1098,7 @@ private[spark] class AppStatusListener( // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala - val keys = stages.map(s => (s.info.stageId, s.info.attemptId)).toSet + val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet tasks.foreach { t => if (keys.contains((t.stageId, t.stageAttemptId))) { kvstore.delete(t.getClass(), t.taskId)