From cac58e1d2b94fd80d7077387670545848d86733d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 18 Jan 2017 19:15:56 -0800 Subject: [PATCH 1/4] Fix StateStore --- .../streaming/state/StateStore.scala | 75 ++++++++++++------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index d59746f947c1..48173bdb72d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.util.concurrent.{ScheduledFuture, TimeUnit} +import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.util.control.NonFatal @@ -125,10 +126,36 @@ object StateStore extends Logging { val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60 private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() - private val maintenanceTaskExecutor = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") - @volatile private var maintenanceTask: ScheduledFuture[_] = null + class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) { + private val executor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") + + private val runnable = new Runnable { + override def run(): Unit = { + try { + doMaintenance() + } catch { + case NonFatal(e) => + logWarning("Error running maintenance thread", e) + future.cancel(false) // do interrupt active run, as this is being called from the run + onError + } + } + } + + private val future: ScheduledFuture[_] = executor.scheduleAtFixedRate( + runnable, periodMs, periodMs, TimeUnit.MILLISECONDS) + + def stop(): Unit = { future.cancel(false) } + + def isRunning: Boolean = !future.isCancelled + } + + @GuardedBy("loadedProviders") + @volatile private var maintenanceTask: MaintenanceTask = null + + @GuardedBy("loadedProviders") @volatile private var _coordRef: StateStoreCoordinatorRef = null /** Get or create a store associated with the id. */ @@ -162,7 +189,7 @@ object StateStore extends Logging { } def isMaintenanceRunning: Boolean = loadedProviders.synchronized { - maintenanceTask != null + maintenanceTask != null && maintenanceTask.isRunning } /** Unload and stop all state store providers */ @@ -170,7 +197,7 @@ object StateStore extends Logging { loadedProviders.clear() _coordRef = null if (maintenanceTask != null) { - maintenanceTask.cancel(false) + maintenanceTask.stop() maintenanceTask = null } logInfo("StateStore stopped") @@ -179,14 +206,14 @@ object StateStore extends Logging { /** Start the periodic maintenance task if not already started and if Spark active */ private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized { val env = SparkEnv.get - if (maintenanceTask == null && env != null) { + if (env != null && (maintenanceTask == null || !maintenanceTask.isRunning)) { val periodMs = env.conf.getTimeAsMs( MAINTENANCE_INTERVAL_CONFIG, s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s") - val runnable = new Runnable { - override def run(): Unit = { doMaintenance() } - } - maintenanceTask = maintenanceTaskExecutor.scheduleAtFixedRate( - runnable, periodMs, periodMs, TimeUnit.MILLISECONDS) + maintenanceTask = new MaintenanceTask( + periodMs, + task = { doMaintenance() }, + onError = { loadedProviders.synchronized { loadedProviders.clear() } } + ) logInfo("State Store maintenance task started") } } @@ -198,21 +225,19 @@ object StateStore extends Logging { private def doMaintenance(): Unit = { logDebug("Doing maintenance") if (SparkEnv.get == null) { - stop() - } else { - loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => - try { - if (verifyIfStoreInstanceActive(id)) { - provider.doMaintenance() - } else { - unload(id) - logInfo(s"Unloaded $provider") - } - } catch { - case NonFatal(e) => - logWarning(s"Error managing $provider, stopping management thread") - stop() + throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores") + } + loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => + try { + if (verifyIfStoreInstanceActive(id)) { + provider.doMaintenance() + } else { + unload(id) + logInfo(s"Unloaded $provider") } + } catch { + case NonFatal(e) => + logWarning(s"Error managing $provider, stopping management thread") } } } From 7557ade6b0a67e2887c871bfbe8720bb32947041 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 18 Jan 2017 19:17:51 -0800 Subject: [PATCH 2/4] Minor --- .../apache/spark/sql/execution/streaming/state/StateStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 48173bdb72d7..87b198967ad7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -134,7 +134,7 @@ object StateStore extends Logging { private val runnable = new Runnable { override def run(): Unit = { try { - doMaintenance() + task } catch { case NonFatal(e) => logWarning("Error running maintenance thread", e) From cbcd6e8ef715cd09389284020f6c49587318869a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 20 Jan 2017 11:55:39 -0800 Subject: [PATCH 3/4] Minor fix --- .../execution/streaming/state/StateStore.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 87b198967ad7..ce09677e4aa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -125,6 +125,7 @@ object StateStore extends Logging { val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval" val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60 + @GuardedBy("loadedProviders") private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) { @@ -138,8 +139,8 @@ object StateStore extends Logging { } catch { case NonFatal(e) => logWarning("Error running maintenance thread", e) - future.cancel(false) // do interrupt active run, as this is being called from the run onError + throw e } } } @@ -147,16 +148,19 @@ object StateStore extends Logging { private val future: ScheduledFuture[_] = executor.scheduleAtFixedRate( runnable, periodMs, periodMs, TimeUnit.MILLISECONDS) - def stop(): Unit = { future.cancel(false) } + def stop(): Unit = { + future.cancel(false) + executor.shutdown() + } - def isRunning: Boolean = !future.isCancelled + def isRunning: Boolean = !future.isDone } @GuardedBy("loadedProviders") - @volatile private var maintenanceTask: MaintenanceTask = null + private var maintenanceTask: MaintenanceTask = null @GuardedBy("loadedProviders") - @volatile private var _coordRef: StateStoreCoordinatorRef = null + private var _coordRef: StateStoreCoordinatorRef = null /** Get or create a store associated with the id. */ def get( @@ -238,6 +242,7 @@ object StateStore extends Logging { } catch { case NonFatal(e) => logWarning(s"Error managing $provider, stopping management thread") + throw e } } } @@ -263,7 +268,7 @@ object StateStore extends Logging { } } - private def coordinatorRef: Option[StateStoreCoordinatorRef] = synchronized { + private def coordinatorRef: Option[StateStoreCoordinatorRef] = loadedProviders.synchronized { val env = SparkEnv.get if (env != null) { if (_coordRef == null) { From d66f8d74ac675796254105de7397bb7b6b3c960e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 20 Jan 2017 14:45:06 -0800 Subject: [PATCH 4/4] Address TD's comments --- .../spark/sql/execution/streaming/state/StateStore.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index ce09677e4aa5..e61d95a1b1bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -128,6 +128,10 @@ object StateStore extends Logging { @GuardedBy("loadedProviders") private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]() + /** + * Runs the `task` periodically and automatically cancels it if there is an exception. `onError` + * will be called when an exception happens. + */ class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) { private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task") @@ -210,7 +214,7 @@ object StateStore extends Logging { /** Start the periodic maintenance task if not already started and if Spark active */ private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized { val env = SparkEnv.get - if (env != null && (maintenanceTask == null || !maintenanceTask.isRunning)) { + if (env != null && !isMaintenanceRunning) { val periodMs = env.conf.getTimeAsMs( MAINTENANCE_INTERVAL_CONFIG, s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s") maintenanceTask = new MaintenanceTask(