Skip to content

Commit ea407a6

Browse files
author
Ahmed Al-Obaidi
committed
Scalable Memory option for HDFSBackedStateStore
Allow configuration option to unload loadedMaps from memory after commit
1 parent 93df3cd commit ea407a6

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
132132
commitUpdates(newVersion, mapToUpdate, compressedStream)
133133
state = COMMITTED
134134
logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile")
135+
if (unloadAfterCommit) {
136+
synchronized {
137+
loadedMaps.values.foreach(_.clear())
138+
loadedMaps.clear()
139+
}
140+
}
135141
newVersion
136142
} catch {
137143
case NonFatal(e) =>
@@ -206,6 +212,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
206212
this.storeConf = storeConf
207213
this.hadoopConf = hadoopConf
208214
fm.mkdirs(baseDir)
215+
this.unloadAfterCommit = storeConf.confs
216+
.getOrElse("spark.sql.streaming.stateStore.unloadAfterCommit", "false").toBoolean
209217
}
210218

211219
override def stateStoreId: StateStoreId = stateStoreId_
@@ -241,6 +249,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
241249
@volatile private var valueSchema: StructType = _
242250
@volatile private var storeConf: StateStoreConf = _
243251
@volatile private var hadoopConf: Configuration = _
252+
@volatile private var unloadAfterCommit: Boolean = _
244253

245254
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
246255
private lazy val baseDir = stateStoreId.storeCheckpointLocation()

0 commit comments

Comments
 (0)