From ea407a69c05f34af9587d299c4cc7a322382057a Mon Sep 17 00:00:00 2001 From: Ahmed Al-Obaidi Date: Sat, 31 Mar 2018 13:43:45 -0600 Subject: [PATCH] Scalable Memory option for HDFSBackedStateStore Allow configuration option to unload loadedMaps from memory after commit --- .../streaming/state/HDFSBackedStateStoreProvider.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index df722b953228b..ff4a1d0c52c4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -132,6 +132,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit commitUpdates(newVersion, mapToUpdate, compressedStream) state = COMMITTED logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") + if (unloadAfterCommit) { + synchronized { + loadedMaps.values.foreach(_.clear()) + loadedMaps.clear() + } + } newVersion } catch { case NonFatal(e) => @@ -206,6 +212,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit this.storeConf = storeConf this.hadoopConf = hadoopConf fm.mkdirs(baseDir) + this.unloadAfterCommit = storeConf.confs + .getOrElse("spark.sql.streaming.stateStore.unloadAfterCommit", "false").toBoolean } override def stateStoreId: StateStoreId = stateStoreId_ @@ -241,6 +249,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var valueSchema: StructType = _ @volatile private var storeConf: StateStoreConf = _ @volatile private var hadoopConf: Configuration = _ + @volatile private var unloadAfterCommit: Boolean = _ private lazy val loadedMaps = new mutable.HashMap[Long, MapType] private lazy val baseDir = stateStoreId.storeCheckpointLocation()