diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 7d71f5242c27..f1e7f1d113ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -159,7 +159,7 @@ private[state] class HDFSBackedStateStoreProvider( } catch { case NonFatal(e) => throw new IllegalStateException( - s"Error committing version $newVersion into ${HDFSBackedStateStoreProvider.this}", e) + s"Error committing version $newVersion into $this", e) } } @@ -205,6 +205,10 @@ private[state] class HDFSBackedStateStoreProvider( override private[state] def hasCommitted: Boolean = { state == COMMITTED } + + override def toString(): String = { + s"HDFSStateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]" + } } /** Get the state store for making updates to create a new `version` of the store. */ @@ -215,7 +219,7 @@ private[state] class HDFSBackedStateStoreProvider( newMap.putAll(loadMap(version)) } val store = new HDFSBackedStateStore(version, newMap) - logInfo(s"Retrieved version $version of $this for update") + logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update") store } @@ -231,7 +235,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def toString(): String = { - s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]" + s"HDFSStateStoreProvider[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]" } /* Internal classes and methods */ @@ -493,10 +497,12 @@ private[state] class HDFSBackedStateStoreProvider( val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq mapsToRemove.foreach(loadedMaps.remove) } - files.filter(_.version < earliestFileToRetain.version).foreach { f => + val filesToDelete = files.filter(_.version < earliestFileToRetain.version) + filesToDelete.foreach { f => fs.delete(f.path, true) } - logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this") + logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " + + filesToDelete.mkString(", ")) } } } catch { @@ -560,7 +566,7 @@ private[state] class HDFSBackedStateStoreProvider( } } val storeFiles = versionToFiles.values.toSeq.sortBy(_.version) - logDebug(s"Current set of files for $this: $storeFiles") + logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}") storeFiles } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala index d945d7aff2da..267d17623d5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala @@ -38,7 +38,7 @@ private case class VerifyIfInstanceActive(storeId: StateStoreId, executorId: Str private case class GetLocation(storeId: StateStoreId) extends StateStoreCoordinatorMessage -private case class DeactivateInstances(storeRootLocation: String) +private case class DeactivateInstances(checkpointLocation: String) extends StateStoreCoordinatorMessage private object StopCoordinator @@ -111,11 +111,13 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { +private class StateStoreCoordinator(override val rpcEnv: RpcEnv) + extends ThreadSafeRpcEndpoint with Logging { private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation] override def receive: PartialFunction[Any, Unit] = { case ReportActiveInstance(id, host, executorId) => + logDebug(s"Reported state store $id is active at $executorId") instances.put(id, ExecutorCacheTaskLocation(host, executorId)) } @@ -125,19 +127,25 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS case Some(location) => location.executorId == execId case None => false } + logDebug(s"Verified that state store $id is active: $response") context.reply(response) case GetLocation(id) => - context.reply(instances.get(id).map(_.toString)) + val executorId = instances.get(id).map(_.toString) + logDebug(s"Got location of the state store $id: $executorId") + context.reply(executorId) - case DeactivateInstances(loc) => + case DeactivateInstances(checkpointLocation) => val storeIdsToRemove = - instances.keys.filter(_.checkpointLocation == loc).toSeq + instances.keys.filter(_.checkpointLocation == checkpointLocation).toSeq instances --= storeIdsToRemove + logDebug(s"Deactivating instances related to checkpoint location $checkpointLocation: " + + storeIdsToRemove.mkString(", ")) context.reply(true) case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered + logInfo("StateStoreCoordinator stopped") context.reply(true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 06f1bd6c3bcc..fcf300b3c81b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -367,7 +367,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val conf = new SparkConf() .setMaster("local") .setAppName("test") + // Make maintenance thread do snapshots and cleanups very fast .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms") + // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly' + // fails to talk to the StateStoreCoordinator and unloads all the StateStores .set("spark.rpc.numRetries", "1") val opId = 0 val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString @@ -377,37 +380,49 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val provider = new HDFSBackedStateStoreProvider( storeId, keySchema, valueSchema, storeConf, hadoopConf) + var latestStoreVersion = 0 + + def generateStoreVersions() { + for (i <- 1 to 20) { + val store = StateStore.get( + storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf) + put(store, "a", i) + store.commit() + latestStoreVersion += 1 + } + } quietly { withSpark(new SparkContext(conf)) { sc => withCoordinatorRef(sc) { coordinatorRef => require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running") - for (i <- 1 to 20) { - val store = StateStore.get( - storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf) - put(store, "a", i) - store.commit() - } + // Generate sufficient versions of store for snapshots + generateStoreVersions() eventually(timeout(10 seconds)) { + // Store should have been reported to the coordinator assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported") - } - // Background maintenance should clean up and generate snapshots - assert(StateStore.isMaintenanceRunning, "Maintenance task is not running") - - eventually(timeout(10 seconds)) { - // Earliest delta file should get cleaned up - assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted") + // Background maintenance should clean up and generate snapshots + assert(StateStore.isMaintenanceRunning, "Maintenance task is not running") // Some snapshots should have been generated - val snapshotVersions = (0 to 20).filter { version => + val snapshotVersions = (1 to latestStoreVersion).filter { version => fileExists(provider, version, isSnapshot = true) } assert(snapshotVersions.nonEmpty, "no snapshot file found") } + // Generate more versions such that there is another snapshot and + // the earliest delta file will be cleaned up + generateStoreVersions() + + // Earliest delta file should get cleaned up + eventually(timeout(10 seconds)) { + assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted") + } + // If driver decides to deactivate all instances of the store, then this instance // should be unloaded coordinatorRef.deactivateInstances(dir) @@ -416,7 +431,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } // Reload the store and verify - StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf) + StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf) assert(StateStore.isLoaded(storeId)) // If some other executor loads the store, then this instance should be unloaded @@ -426,14 +441,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } // Reload the store and verify - StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf) + StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf) assert(StateStore.isLoaded(storeId)) } } // Verify if instance is unloaded if SparkContext is stopped - require(SparkEnv.get === null) eventually(timeout(10 seconds)) { + require(SparkEnv.get === null) assert(!StateStore.isLoaded(storeId)) assert(!StateStore.isMaintenanceRunning) }