Skip to content

Commit cab25df

Browse files
committed
SPARK-24717 Split out min retain version of state for memory in HDFSBackedStateStoreProvider
* introduce BoundedSortedMap which implements bounded size of sorted map * only first N elements will be retained * replace loadedMaps to BoundedSortedMap to retain only N versions of states * no need to cleanup in maintenance phase * introduce new configuration: spark.sql.streaming.minBatchesToRetainInMemory
1 parent 85fe129 commit cab25df

File tree

5 files changed

+252
-10
lines changed

5 files changed

+252
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,13 @@ object SQLConf {
825825
.intConf
826826
.createWithDefault(100)
827827

828+
val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
829+
.internal()
830+
.doc("The maximum number of batches which will be retained in memory to avoid " +
831+
"loading from files.")
832+
.intConf
833+
.createWithDefault(2)
834+
828835
val UNSUPPORTED_OPERATION_CHECK_ENABLED =
829836
buildConf("spark.sql.streaming.unsupportedOperationCheck")
830837
.internal()
@@ -1463,6 +1470,8 @@ class SQLConf extends Serializable with Logging {
14631470

14641471
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
14651472

1473+
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
1474+
14661475
def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
14671476

14681477
def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.streaming.state;
18+
19+
import java.util.Comparator;
20+
import java.util.Map;
21+
import java.util.TreeMap;
22+
23+
/**
24+
* This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}.
25+
* <p/>
26+
* As TreeMap does, this implementation sorts elements in natural order, and cuts off
27+
* smaller elements to retain at most bigger N elements.
28+
* <p/>
29+
* You can provide reversed order of comparator to retain smaller elements instead.
30+
* <p/>
31+
* This class is not thread-safe, so synchronization would be needed to use this concurrently.
32+
*
33+
* @param <K> key type
34+
* @param <V> value type
35+
*/
36+
public class BoundedSortedMap<K, V> extends TreeMap<K, V> {
37+
38+
private final int limit;
39+
40+
/**
41+
* Constructor
42+
*
43+
* @param comparator comparator instance to compare between keys
44+
* @param limit bounded size
45+
*/
46+
public BoundedSortedMap(Comparator<K> comparator, int limit) {
47+
super(comparator);
48+
this.limit = limit;
49+
}
50+
51+
@Override
52+
public void putAll(Map<? extends K, ? extends V> map) {
53+
for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
54+
put(entry.getKey(), entry.getValue());
55+
}
56+
}
57+
58+
@Override
59+
public V put(K key, V value) {
60+
// This method doesn't guarantee thread-safety, so it must be guarded with synchronization
61+
62+
while (size() > limit) {
63+
remove(lastKey());
64+
}
65+
66+
if (size() == limit) {
67+
K lk = lastKey();
68+
if (comparator().compare(lk, key) > 0) {
69+
remove(lk);
70+
return super.put(key, value);
71+
}
72+
73+
// unchanged
74+
return null;
75+
}
76+
77+
return super.put(key, value);
78+
}
79+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.io.LZ4CompressionCodec
3535
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3636
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
3737
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
38+
import org.apache.spark.sql.streaming.state.BoundedSortedMap
3839
import org.apache.spark.sql.types.StructType
3940
import org.apache.spark.util.{SizeEstimator, Utils}
4041

@@ -203,6 +204,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
203204
this.valueSchema = valueSchema
204205
this.storeConf = storeConf
205206
this.hadoopConf = hadoopConf
207+
this.numberOfVersionsRetainInMemory = storeConf.maxVersionsToRetainInMemory
208+
this.loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse,
209+
numberOfVersionsRetainInMemory)
206210
fm.mkdirs(baseDir)
207211
}
208212

@@ -220,7 +224,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
220224
}
221225

222226
override def close(): Unit = {
223-
loadedMaps.values.foreach(_.clear())
227+
loadedMaps.values.asScala.foreach(_.clear())
224228
}
225229

226230
override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = {
@@ -240,7 +244,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
240244
@volatile private var storeConf: StateStoreConf = _
241245
@volatile private var hadoopConf: Configuration = _
242246

243-
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
247+
// taking default value first: this will be updated by init method with configuration
248+
@volatile private var numberOfVersionsRetainInMemory: Int = 2
249+
@volatile private var loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse,
250+
numberOfVersionsRetainInMemory)
251+
244252
private lazy val baseDir = stateStoreId.storeCheckpointLocation()
245253
private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
246254
private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
@@ -260,7 +268,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
260268
*/
261269
private[state] def latestIterator(): Iterator[UnsafeRowPair] = synchronized {
262270
val versionsInFiles = fetchFiles().map(_.version).toSet
263-
val versionsLoaded = loadedMaps.keySet
271+
val versionsLoaded = loadedMaps.keySet.asScala
264272
val allKnownVersions = versionsInFiles ++ versionsLoaded
265273
val unsafeRowTuple = new UnsafeRowPair()
266274
if (allKnownVersions.nonEmpty) {
@@ -274,7 +282,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
274282
private def loadMap(version: Long): MapType = {
275283

276284
// Shortcut if the map for this version is already there to avoid a redundant put.
277-
val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) }
285+
val loadedCurrentVersionMap = synchronized { Option(loadedMaps.get(version)) }
278286
if (loadedCurrentVersionMap.isDefined) {
279287
return loadedCurrentVersionMap.get
280288
}
@@ -302,7 +310,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
302310
lastAvailableMap = Some(new MapType)
303311
} else {
304312
lastAvailableMap =
305-
synchronized { loadedMaps.get(lastAvailableVersion) }
313+
synchronized { Option(loadedMaps.get(lastAvailableVersion)) }
306314
.orElse(readSnapshotFile(lastAvailableVersion))
307315
}
308316
}
@@ -506,7 +514,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
506514
val lastVersion = files.last.version
507515
val deltaFilesForLastVersion =
508516
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
509-
synchronized { loadedMaps.get(lastVersion) } match {
517+
synchronized { Option(loadedMaps.get(lastVersion)) } match {
510518
case Some(map) =>
511519
if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
512520
val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, map))
@@ -536,10 +544,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
536544
val earliestVersionToRetain = files.last.version - storeConf.minVersionsToRetain
537545
if (earliestVersionToRetain > 0) {
538546
val earliestFileToRetain = filesForVersion(files, earliestVersionToRetain).head
539-
synchronized {
540-
val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq
541-
mapsToRemove.foreach(loadedMaps.remove)
542-
}
543547
val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
544548
val (_, e2) = Utils.timeTakenMs {
545549
filesToDelete.foreach { f =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class StateStoreConf(@transient private val sqlConf: SQLConf)
3434
/** Minimum versions a State Store implementation should retain to allow rollbacks */
3535
val minVersionsToRetain: Int = sqlConf.minBatchesToRetain
3636

37+
/** Maximum count of versions a State Store implementation should retain in memory */
38+
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
39+
3740
/**
3841
* Optional fully qualified name of the subclass of [[StateStoreProvider]]
3942
* managing state data. That is, the implementation of the State Store to use.
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package test.org.apache.spark.sql.streaming.state;
18+
19+
import org.apache.spark.sql.streaming.state.BoundedSortedMap;
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
import java.util.Comparator;
24+
25+
public class BoundedSortedMapSuite {
26+
27+
@Test
28+
public void testAddElementBelowBoundedCount() {
29+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
30+
Comparator.naturalOrder(), 2);
31+
32+
map.put(2, 2);
33+
map.put(1, 1);
34+
35+
Assert.assertEquals(2, map.size());
36+
Assert.assertEquals(Integer.valueOf(1), map.firstKey());
37+
Assert.assertEquals(Integer.valueOf(2), map.lastKey());
38+
}
39+
40+
@Test
41+
public void testAddSmallestElementToFullOfMap() {
42+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
43+
Comparator.naturalOrder(), 2);
44+
45+
map.put(2, 2);
46+
map.put(1, 1);
47+
48+
map.put(0, 0);
49+
50+
Assert.assertEquals(2, map.size());
51+
Assert.assertEquals(Integer.valueOf(0), map.firstKey());
52+
Assert.assertEquals(Integer.valueOf(1), map.lastKey());
53+
}
54+
55+
@Test
56+
public void testAddMiddleOfElementToFullOfMap() {
57+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
58+
Comparator.naturalOrder(), 2);
59+
60+
map.put(3, 3);
61+
map.put(1, 1);
62+
63+
// 3 is being cut off
64+
map.put(2, 2);
65+
66+
Assert.assertEquals(2, map.size());
67+
Assert.assertEquals(Integer.valueOf(1), map.firstKey());
68+
Assert.assertEquals(Integer.valueOf(2), map.lastKey());
69+
}
70+
71+
@Test
72+
public void testAddBiggestOfElementToFullOfMap() {
73+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
74+
Comparator.naturalOrder(), 2);
75+
76+
map.put(2, 2);
77+
map.put(1, 1);
78+
79+
// 3 is not added
80+
Assert.assertNull(map.put(3, 3));
81+
82+
Assert.assertEquals(2, map.size());
83+
Assert.assertEquals(Integer.valueOf(1), map.firstKey());
84+
Assert.assertEquals(Integer.valueOf(2), map.lastKey());
85+
}
86+
87+
@Test
88+
public void testReversedOrderAddElementBelowBoundedCount() {
89+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
90+
Comparator.reverseOrder(), 2);
91+
92+
map.put(2, 2);
93+
map.put(1, 1);
94+
95+
Assert.assertEquals(2, map.size());
96+
Assert.assertEquals(Integer.valueOf(2), map.firstKey());
97+
Assert.assertEquals(Integer.valueOf(1), map.lastKey());
98+
}
99+
100+
@Test
101+
public void testReversedOrderAddBiggestElementToFullOfMap() {
102+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
103+
Comparator.reverseOrder(), 2);
104+
105+
map.put(2, 2);
106+
map.put(1, 1);
107+
108+
// 1 is being cut off
109+
map.put(3, 3);
110+
111+
Assert.assertEquals(2, map.size());
112+
Assert.assertEquals(Integer.valueOf(3), map.firstKey());
113+
Assert.assertEquals(Integer.valueOf(2), map.lastKey());
114+
}
115+
116+
@Test
117+
public void testReversedOrderAddMiddleOfElementToFullOfMap() {
118+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
119+
Comparator.reverseOrder(), 2);
120+
121+
map.put(3, 3);
122+
map.put(1, 1);
123+
124+
// 1 is being cut off
125+
map.put(2, 2);
126+
127+
Assert.assertEquals(2, map.size());
128+
Assert.assertEquals(Integer.valueOf(3), map.firstKey());
129+
Assert.assertEquals(Integer.valueOf(2), map.lastKey());
130+
}
131+
132+
@Test
133+
public void testReversedOrderAddSmallestOfElementToFullOfMap() {
134+
BoundedSortedMap<Integer, Integer> map = new BoundedSortedMap<Integer, Integer>(
135+
Comparator.reverseOrder(), 2);
136+
137+
map.put(3, 3);
138+
map.put(2, 2);
139+
140+
// 1 is not added
141+
Assert.assertNull(map.put(1, 1));
142+
143+
Assert.assertEquals(2, map.size());
144+
Assert.assertEquals(Integer.valueOf(3), map.firstKey());
145+
Assert.assertEquals(Integer.valueOf(2), map.lastKey());
146+
}
147+
}

0 commit comments

Comments
 (0)