@@ -25,6 +25,7 @@ import scala.collection.mutable
2525import scala .util .control .NonFatal
2626
2727import com .google .common .io .ByteStreams
28+ import java .util
2829import org .apache .commons .io .IOUtils
2930import org .apache .hadoop .conf .Configuration
3031import org .apache .hadoop .fs ._
@@ -35,7 +36,6 @@ import org.apache.spark.io.LZ4CompressionCodec
3536import org .apache .spark .sql .catalyst .expressions .UnsafeRow
3637import org .apache .spark .sql .execution .streaming .CheckpointFileManager
3738import org .apache .spark .sql .execution .streaming .CheckpointFileManager .CancellableFSDataOutputStream
38- import org .apache .spark .sql .streaming .state .BoundedSortedMap
3939import org .apache .spark .sql .types .StructType
4040import org .apache .spark .util .{SizeEstimator , Utils }
4141
@@ -205,8 +205,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
205205 this .storeConf = storeConf
206206 this .hadoopConf = hadoopConf
207207 this .numberOfVersionsToRetainInMemory = storeConf.maxVersionsToRetainInMemory
208- this .loadedMaps = new BoundedSortedMap [Long , MapType ](Ordering [Long ].reverse,
209- numberOfVersionsToRetainInMemory)
210208 fm.mkdirs(baseDir)
211209 }
212210
@@ -243,12 +241,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
243241 @ volatile private var valueSchema : StructType = _
244242 @ volatile private var storeConf : StateStoreConf = _
245243 @ volatile private var hadoopConf : Configuration = _
244+ @ volatile private var numberOfVersionsToRetainInMemory : Int = _
246245
247- // taking default value first: this will be updated by init method with configuration
248- @ volatile private var numberOfVersionsToRetainInMemory : Int = 2
249- @ volatile private var loadedMaps = new BoundedSortedMap [Long , MapType ](Ordering [Long ].reverse,
250- numberOfVersionsToRetainInMemory)
251-
246+ private lazy val loadedMaps = new util.TreeMap [Long , MapType ](Ordering [Long ].reverse)
252247 private lazy val baseDir = stateStoreId.storeCheckpointLocation()
253248 private lazy val fm = CheckpointFileManager .create(baseDir, hadoopConf)
254249 private lazy val sparkConf = Option (SparkEnv .get).map(_.conf).getOrElse(new SparkConf )
@@ -258,7 +253,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
258253 private def commitUpdates (newVersion : Long , map : MapType , output : DataOutputStream ): Unit = {
259254 synchronized {
260255 finalizeDeltaFile(output)
261- loadedMaps.put (newVersion, map)
256+ putStateIntoStateCache (newVersion, map)
262257 }
263258 }
264259
@@ -278,6 +273,32 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
278273 } else Iterator .empty
279274 }
280275
276+ /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */
277+ private [state] def getClonedLoadedMaps (): util.SortedMap [Long , MapType ] = synchronized {
278+ // shallow copy as a minimal guard
279+ loadedMaps.clone().asInstanceOf [util.SortedMap [Long , MapType ]]
280+ }
281+
282+ private def putStateIntoStateCache (newVersion : Long , map : MapType ): Unit = synchronized {
283+ while (loadedMaps.size() > numberOfVersionsToRetainInMemory) {
284+ loadedMaps.remove(loadedMaps.lastKey())
285+ }
286+
287+ val size = loadedMaps.size()
288+ if (size == numberOfVersionsToRetainInMemory) {
289+ val versionIdForLastKey = loadedMaps.lastKey()
290+ if (versionIdForLastKey > newVersion) {
291+ // this is the only case which put doesn't need
292+ return
293+ } else if (versionIdForLastKey < newVersion) {
294+ // this case needs removal of the last key before putting new one
295+ loadedMaps.remove(versionIdForLastKey)
296+ }
297+ }
298+
299+ loadedMaps.put(newVersion, map)
300+ }
301+
281302 /** Load the required version of the map data from the backing files */
282303 private def loadMap (version : Long ): MapType = {
283304
@@ -294,7 +315,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
294315 val (result, elapsedMs) = Utils .timeTakenMs {
295316 val snapshotCurrentVersionMap = readSnapshotFile(version)
296317 if (snapshotCurrentVersionMap.isDefined) {
297- synchronized { loadedMaps.put (version, snapshotCurrentVersionMap.get) }
318+ synchronized { putStateIntoStateCache (version, snapshotCurrentVersionMap.get) }
298319 return snapshotCurrentVersionMap.get
299320 }
300321
@@ -322,7 +343,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
322343 updateFromDeltaFile(deltaVersion, resultMap)
323344 }
324345
325- synchronized { loadedMaps.put (version, resultMap) }
346+ synchronized { putStateIntoStateCache (version, resultMap) }
326347 resultMap
327348 }
328349
0 commit comments