From 772150fad512ee6598be515d2e5b44f3a6ed67d0 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Thu, 23 Feb 2017 18:17:01 -0800 Subject: [PATCH] Shutdown log watcher explicitly --- .../org/apache/spark/deploy/kubernetes/Client.scala | 10 +++++++--- .../deploy/kubernetes/LoggingPodStatusWatcher.scala | 4 ++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c787d5917e381..6289ccf0ff20c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -119,18 +119,19 @@ private[spark] class Client( .withType("Opaque") .done() kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) + var loggingWatch = None: Option[LoggingPodStatusWatcher] try { val sslConfiguration = sslConfigurationProvider.getSslConfiguration() // start outer watch for status logging of driver pod val driverPodCompletedLatch = new CountDownLatch(1) // only enable interval logging if in waitForAppCompletion mode val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - loggingInterval) + loggingWatch = Option(new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, + loggingInterval)) Utils.tryWithResource(kubernetesClient .pods() .withName(kubernetesAppId) - .watch(loggingWatch)) { _ => + .watch(loggingWatch.get)) { _ => val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, parsedCustomLabels, @@ -164,6 +165,9 @@ private[spark] class Client( } } finally { kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) + if (loggingWatch.nonEmpty) { + loggingWatch.get.shutdown() + } } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 17c3db8331ac4..2129c2f8bd0fa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -90,6 +90,10 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL private def closeWatch(): Unit = { podCompletedFuture.countDown() + shutdown() + } + + def shutdown(): Unit = { scheduler.shutdown() }