From cab25dfd8599a2edfdefe83ad9b9be1f827aaad0 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 3 Jul 2018 07:04:49 +0900 Subject: [PATCH 01/13] SPARK-24717 Split out min retain version of state for memory in HDFSBackedStateStoreProvider * introduce BoundedSortedMap which implements bounded size of sorted map * only first N elements will be retained * replace loadedMaps to BoundedSortedMap to retain only N versions of states * no need to cleanup in maintenance phase * introduce new configuration: spark.sql.streaming.minBatchesToRetainInMemory --- .../apache/spark/sql/internal/SQLConf.scala | 9 ++ .../sql/streaming/state/BoundedSortedMap.java | 79 ++++++++++ .../state/HDFSBackedStateStoreProvider.scala | 24 +-- .../streaming/state/StateStoreConf.scala | 3 + .../state/BoundedSortedMapSuite.java | 147 ++++++++++++++++++ 5 files changed, 252 insertions(+), 10 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e2c48e2d8a14c..0e65e30636474 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -825,6 +825,13 @@ object SQLConf { .intConf .createWithDefault(100) + val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory") + .internal() + .doc("The maximum number of batches which will be retained in memory to avoid " + + "loading from files.") + .intConf + .createWithDefault(2) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() @@ -1463,6 +1470,8 @@ class SQLConf extends Serializable with Logging { def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) + def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) + def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED) diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java new file mode 100644 index 0000000000000..914d497d25d16 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming.state; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeMap; + +/** + * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. + *

+ * As TreeMap does, this implementation sorts elements in natural order, and cuts off + * smaller elements to retain at most bigger N elements. + *

+ * You can provide reversed order of comparator to retain smaller elements instead. + *

+ * This class is not thread-safe, so synchronization would be needed to use this concurrently. + * + * @param key type + * @param value type + */ +public class BoundedSortedMap extends TreeMap { + + private final int limit; + + /** + * Constructor + * + * @param comparator comparator instance to compare between keys + * @param limit bounded size + */ + public BoundedSortedMap(Comparator comparator, int limit) { + super(comparator); + this.limit = limit; + } + + @Override + public void putAll(Map map) { + for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public V put(K key, V value) { + // This method doesn't guarantee thread-safety, so it must be guarded with synchronization + + while (size() > limit) { + remove(lastKey()); + } + + if (size() == limit) { + K lk = lastKey(); + if (comparator().compare(lk, key) > 0) { + remove(lk); + return super.put(key, value); + } + + // unchanged + return null; + } + + return super.put(key, value); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 118c82aa75e68..5dff066856249 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -35,6 +35,7 @@ import org.apache.spark.io.LZ4CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream +import org.apache.spark.sql.streaming.state.BoundedSortedMap import org.apache.spark.sql.types.StructType import org.apache.spark.util.{SizeEstimator, Utils} @@ -203,6 +204,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit this.valueSchema = valueSchema this.storeConf = storeConf this.hadoopConf = hadoopConf + this.numberOfVersionsRetainInMemory = storeConf.maxVersionsToRetainInMemory + this.loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse, + numberOfVersionsRetainInMemory) fm.mkdirs(baseDir) } @@ -220,7 +224,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def close(): Unit = { - loadedMaps.values.foreach(_.clear()) + loadedMaps.values.asScala.foreach(_.clear()) } override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = { @@ -240,7 +244,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var storeConf: StateStoreConf = _ @volatile private var hadoopConf: Configuration = _ - private lazy val loadedMaps = new mutable.HashMap[Long, MapType] + // taking default value first: this will be updated by init method with configuration + @volatile private var numberOfVersionsRetainInMemory: Int = 2 + @volatile private var loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse, + numberOfVersionsRetainInMemory) + private lazy val baseDir = stateStoreId.storeCheckpointLocation() private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) @@ -260,7 +268,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit */ private[state] def latestIterator(): Iterator[UnsafeRowPair] = synchronized { val versionsInFiles = fetchFiles().map(_.version).toSet - val versionsLoaded = loadedMaps.keySet + val versionsLoaded = loadedMaps.keySet.asScala val allKnownVersions = versionsInFiles ++ versionsLoaded val unsafeRowTuple = new UnsafeRowPair() if (allKnownVersions.nonEmpty) { @@ -274,7 +282,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private def loadMap(version: Long): MapType = { // Shortcut if the map for this version is already there to avoid a redundant put. - val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) } + val loadedCurrentVersionMap = synchronized { Option(loadedMaps.get(version)) } if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } @@ -302,7 +310,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit lastAvailableMap = Some(new MapType) } else { lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } + synchronized { Option(loadedMaps.get(lastAvailableVersion)) } .orElse(readSnapshotFile(lastAvailableVersion)) } } @@ -506,7 +514,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val lastVersion = files.last.version val deltaFilesForLastVersion = filesForVersion(files, lastVersion).filter(_.isSnapshot == false) - synchronized { loadedMaps.get(lastVersion) } match { + synchronized { Option(loadedMaps.get(lastVersion)) } match { case Some(map) => if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) { val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, map)) @@ -536,10 +544,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val earliestVersionToRetain = files.last.version - storeConf.minVersionsToRetain if (earliestVersionToRetain > 0) { val earliestFileToRetain = filesForVersion(files, earliestVersionToRetain).head - synchronized { - val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq - mapsToRemove.foreach(loadedMaps.remove) - } val filesToDelete = files.filter(_.version < earliestFileToRetain.version) val (_, e2) = Utils.timeTakenMs { filesToDelete.foreach { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 765ff076cb467..d145082a39b57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -34,6 +34,9 @@ class StateStoreConf(@transient private val sqlConf: SQLConf) /** Minimum versions a State Store implementation should retain to allow rollbacks */ val minVersionsToRetain: Int = sqlConf.minBatchesToRetain + /** Maximum count of versions a State Store implementation should retain in memory */ + val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory + /** * Optional fully qualified name of the subclass of [[StateStoreProvider]] * managing state data. That is, the implementation of the State Store to use. diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java new file mode 100644 index 0000000000000..3fe48fab368b5 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package test.org.apache.spark.sql.streaming.state; + +import org.apache.spark.sql.streaming.state.BoundedSortedMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Comparator; + +public class BoundedSortedMapSuite { + + @Test + public void testAddElementBelowBoundedCount() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 2); + + map.put(2, 2); + map.put(1, 1); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(1), map.firstKey()); + Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + } + + @Test + public void testAddSmallestElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 2); + + map.put(2, 2); + map.put(1, 1); + + map.put(0, 0); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(0), map.firstKey()); + Assert.assertEquals(Integer.valueOf(1), map.lastKey()); + } + + @Test + public void testAddMiddleOfElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 2); + + map.put(3, 3); + map.put(1, 1); + + // 3 is being cut off + map.put(2, 2); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(1), map.firstKey()); + Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + } + + @Test + public void testAddBiggestOfElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 2); + + map.put(2, 2); + map.put(1, 1); + + // 3 is not added + Assert.assertNull(map.put(3, 3)); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(1), map.firstKey()); + Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + } + + @Test + public void testReversedOrderAddElementBelowBoundedCount() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 2); + + map.put(2, 2); + map.put(1, 1); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(2), map.firstKey()); + Assert.assertEquals(Integer.valueOf(1), map.lastKey()); + } + + @Test + public void testReversedOrderAddBiggestElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 2); + + map.put(2, 2); + map.put(1, 1); + + // 1 is being cut off + map.put(3, 3); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(3), map.firstKey()); + Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + } + + @Test + public void testReversedOrderAddMiddleOfElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 2); + + map.put(3, 3); + map.put(1, 1); + + // 1 is being cut off + map.put(2, 2); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(3), map.firstKey()); + Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + } + + @Test + public void testReversedOrderAddSmallestOfElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 2); + + map.put(3, 3); + map.put(2, 2); + + // 1 is not added + Assert.assertNull(map.put(1, 1)); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(Integer.valueOf(3), map.firstKey()); + Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + } +} From 345b33ab5b9042eb7be86b2993dc9b6306480f5d Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 3 Jul 2018 08:40:22 +0900 Subject: [PATCH 02/13] Fix javadoc style check --- .../apache/spark/sql/streaming/state/BoundedSortedMap.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java index 914d497d25d16..216147e322b0f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java @@ -22,12 +22,12 @@ /** * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. - *

+ * * As TreeMap does, this implementation sorts elements in natural order, and cuts off * smaller elements to retain at most bigger N elements. - *

+ * * You can provide reversed order of comparator to retain smaller elements instead. - *

+ * * This class is not thread-safe, so synchronization would be needed to use this concurrently. * * @param key type From c50da7b40645e8c6d8c1530cf3497ef2d3a09857 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 4 Jul 2018 10:37:07 +0900 Subject: [PATCH 03/13] Elaborate on values which would be expected to be used normally --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0e65e30636474..1c9b047e59748 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -828,7 +828,9 @@ object SQLConf { val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory") .internal() .doc("The maximum number of batches which will be retained in memory to avoid " + - "loading from files.") + "loading from files. The value adjusts a trade-off between memory usage vs cache miss: " + + "'2' covers both success and direct failure cases, '1' covers only success case, " + + "and '0' covers extreme case - disable cache to maximize memory size of executors.") .intConf .createWithDefault(2) From d8b4bb84bc9216ebe9b31f8992e6d59e975b377d Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 4 Jul 2018 11:00:56 +0900 Subject: [PATCH 04/13] Fix java checkstyle via removing whitespace --- .../org/apache/spark/sql/streaming/state/BoundedSortedMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java index 216147e322b0f..73ef0d2bbb255 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java @@ -27,7 +27,7 @@ * smaller elements to retain at most bigger N elements. * * You can provide reversed order of comparator to retain smaller elements instead. - * + * * This class is not thread-safe, so synchronization would be needed to use this concurrently. * * @param key type From 2fd7b7ce4f703635c703339fb4990024cfa74255 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 5 Jul 2018 21:32:02 +0900 Subject: [PATCH 05/13] Apply some optimization into putAll, and add various tests on putAll --- .../sql/streaming/state/BoundedSortedMap.java | 101 +++++- .../state/BoundedSortedMapSuite.java | 305 ++++++++++++++++-- 2 files changed, 363 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java index 73ef0d2bbb255..86a04b6999a45 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java @@ -18,22 +18,23 @@ import java.util.Comparator; import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; /** * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. - * + *

* As TreeMap does, this implementation sorts elements in natural order, and cuts off * smaller elements to retain at most bigger N elements. - * + *

* You can provide reversed order of comparator to retain smaller elements instead. - * + *

* This class is not thread-safe, so synchronization would be needed to use this concurrently. * * @param key type * @param value type */ -public class BoundedSortedMap extends TreeMap { +public final class BoundedSortedMap extends TreeMap { private final int limit; @@ -41,33 +42,29 @@ public class BoundedSortedMap extends TreeMap { * Constructor * * @param comparator comparator instance to compare between keys - * @param limit bounded size + * @param limit bounded size */ public BoundedSortedMap(Comparator comparator, int limit) { super(comparator); this.limit = limit; } - @Override - public void putAll(Map map) { - for (Map.Entry entry : map.entrySet()) { - put(entry.getKey(), entry.getValue()); - } - } - @Override public V put(K key, V value) { // This method doesn't guarantee thread-safety, so it must be guarded with synchronization - - while (size() > limit) { - remove(lastKey()); + if (size() > limit) { + throw new IllegalStateException("BoundedSortedMap is broken: already out of bound."); } if (size() == limit) { K lk = lastKey(); - if (comparator().compare(lk, key) > 0) { + int comp = comparator().compare(lk, key); + if (comp > 0) { remove(lk); return super.put(key, value); + } else if (comp == 0) { + // just overwrite it without explicitly removing + return super.put(key, value); } // unchanged @@ -76,4 +73,74 @@ public V put(K key, V value) { return super.put(key, value); } -} + + @Override + public void putAll(Map map) { + if (size() > limit) { + throw new IllegalStateException("BoundedSortedMap is broken: already out of bound."); + } + + if (isApplicableOfPutAll0(map)) { + SortedMap paramMap = (SortedMap) map; + putAll0(paramMap); + } else { + // fail back to put all entries one by one + for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + } + + private boolean isApplicableOfPutAll0(Map map) { + if (map instanceof SortedMap) { + SortedMap paramMap = (SortedMap) map; + + return (paramMap.comparator() == null && comparator() == null) || ( + paramMap.comparator() != null && paramMap.comparator().equals(comparator())); + } + + return false; + } + + private void putAll0(SortedMap map) { + // assuming isApplicableOfPutAll0 returns true, which means comparators in both maps + // guarantee same ordering of key + + // if first key of this map is bigger (if it's natural ordering) than last key of parameter, + // all of elements in this map will be evicted. + // clear the map and put parameter's entries until reaching limit. + K fk = firstKey(); + if (comparator().compare(fk, map.lastKey()) > 0) { + clear(); + for (Map.Entry entry : map.entrySet()) { + // safe to directly call super.put + super.put(entry.getKey(), entry.getValue()); + + if (size() == limit) { + break; + } + } + + return; + } + + for (Map.Entry entry : map.entrySet()) { + if (size() == limit) { + K lk = lastKey(); + + // if the map is reached the limit and last key of this map is smaller than first key of + // parameter map, no need to add remaining entries from parameter map. + if (comparator().compare(lk, entry.getKey()) < 0) { + return; + } else { + // remove last key to ensure free space before putting new entity + remove(lk); + } + } + + // safe to directly call super.put + super.put(entry.getKey(), entry.getValue()); + } + } + +} \ No newline at end of file diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java index 3fe48fab368b5..035542af62bd8 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java @@ -16,11 +16,20 @@ */ package test.org.apache.spark.sql.streaming.state; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.spark.sql.streaming.state.BoundedSortedMap; import org.junit.Assert; import org.junit.Test; import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; public class BoundedSortedMapSuite { @@ -32,9 +41,8 @@ public void testAddElementBelowBoundedCount() { map.put(2, 2); map.put(1, 1); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(1), map.firstKey()); - Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2)); + assertMap(map, expected); } @Test @@ -47,9 +55,8 @@ public void testAddSmallestElementToFullOfMap() { map.put(0, 0); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(0), map.firstKey()); - Assert.assertEquals(Integer.valueOf(1), map.lastKey()); + List> expected = Lists.newArrayList(IntPair.of(0, 0), IntPair.of(1, 1)); + assertMap(map, expected); } @Test @@ -63,9 +70,8 @@ public void testAddMiddleOfElementToFullOfMap() { // 3 is being cut off map.put(2, 2); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(1), map.firstKey()); - Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2)); + assertMap(map, expected); } @Test @@ -77,11 +83,24 @@ public void testAddBiggestOfElementToFullOfMap() { map.put(1, 1); // 3 is not added - Assert.assertNull(map.put(3, 3)); + map.put(3, 3); + + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2)); + assertMap(map, expected); + } + + @Test + public void testOverwriteBiggestOfElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 2); + + map.put(2, 2); + map.put(1, 1); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(1), map.firstKey()); - Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + map.put(2, 3); + + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 3)); + assertMap(map, expected); } @Test @@ -92,9 +111,8 @@ public void testReversedOrderAddElementBelowBoundedCount() { map.put(2, 2); map.put(1, 1); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(2), map.firstKey()); - Assert.assertEquals(Integer.valueOf(1), map.lastKey()); + List> expected = Lists.newArrayList(IntPair.of(2, 2), IntPair.of(1, 1)); + assertMap(map, expected); } @Test @@ -108,9 +126,8 @@ public void testReversedOrderAddBiggestElementToFullOfMap() { // 1 is being cut off map.put(3, 3); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(3), map.firstKey()); - Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 2)); + assertMap(map, expected); } @Test @@ -124,9 +141,8 @@ public void testReversedOrderAddMiddleOfElementToFullOfMap() { // 1 is being cut off map.put(2, 2); - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(3), map.firstKey()); - Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 2)); + assertMap(map, expected); } @Test @@ -138,10 +154,247 @@ public void testReversedOrderAddSmallestOfElementToFullOfMap() { map.put(2, 2); // 1 is not added - Assert.assertNull(map.put(1, 1)); + map.put(1, 1); + + List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 2)); + assertMap(map, expected); + } + + @Test + public void testReversedOrderOverwriteSmallestOfElementToFullOfMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 2); + + map.put(3, 3); + map.put(2, 2); + + map.put(2, 3); + + List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 3)); + assertMap(map, expected); + } + + @Test + public void testPutAllNonSortedMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 3); + + map.put(1, 1); + map.put(2, 2); + + Map param = new HashMap<>(); + param.put(2, 12); + param.put(1, 11); + param.put(3, 13); + param.put(4, 14); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(1, 11), IntPair.of(2, 12), + IntPair.of(3, 13)); + assertMap(map, expected); + } + + @Test + public void testPutAllSortedMapWithSameOrderingAllKeysAreSmallerThanFirstKey() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 3); + + map.put(7, 7); + map.put(8, 8); + + SortedMap param = new TreeMap<>(Comparator.naturalOrder()); + param.put(1, 11); + param.put(2, 12); + param.put(3, 13); + param.put(4, 14); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(1, 11), IntPair.of(2, 12), + IntPair.of(3, 13)); + assertMap(map, expected); + } + + @Test + public void testPutAllSortedMapWithSameOrderingAllKeysAreBiggerThanLastKey() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 3); + + map.put(1, 1); + map.put(2, 2); + + SortedMap param = new TreeMap<>(Comparator.naturalOrder()); + param.put(3, 13); + param.put(4, 14); + param.put(5, 15); + param.put(6, 16); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2), + IntPair.of(3, 13)); + assertMap(map, expected); + } + + @Test + public void testPutAllSortedMapWithSameOrderingOverlappedKeyRange() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 4); + + map.put(1, 1); + map.put(3, 3); + + SortedMap param = new TreeMap<>(Comparator.naturalOrder()); + param.put(2, 12); + param.put(4, 14); + param.put(6, 16); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 12), + IntPair.of(3, 3), IntPair.of(4, 14)); + assertMap(map, expected); + } + + @Test + public void testPutAllSortedMapWithDifferentOrdering() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.naturalOrder(), 4); + + map.put(1, 1); + map.put(3, 3); + + SortedMap param = new TreeMap<>(Comparator.reverseOrder()); + param.put(6, 16); + param.put(4, 14); + param.put(2, 12); + + map.putAll(param); + + // respecting map's ordering + List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 12), + IntPair.of(3, 3), IntPair.of(4, 14)); + assertMap(map, expected); + } + + @Test + public void testReversedOrderPutAllNonSortedMap() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 3); + + map.put(2, 2); + map.put(1, 1); + + Map param = new HashMap<>(); + param.put(2, 12); + param.put(1, 11); + param.put(3, 13); + param.put(4, 14); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(4, 14), IntPair.of(3, 13), + IntPair.of(2, 12)); + assertMap(map, expected); + } + + @Test + public void testReversedOrderPutAllSortedMapWithSameOrderingAllKeysAreSmallerThanLastKey() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 3); + + map.put(9, 9); + map.put(8, 8); + map.put(7, 7); + + SortedMap param = new TreeMap<>(Comparator.reverseOrder()); + param.put(1, 11); + param.put(2, 12); + param.put(3, 13); + param.put(4, 14); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(9, 9), IntPair.of(8, 8), + IntPair.of(7, 7)); + assertMap(map, expected); + } + + @Test + public void testReversedOrderPutAllSortedMapWithSameOrderingAllKeysAreBiggerThanFirstKey() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 3); + + map.put(2, 2); + map.put(1, 1); + + SortedMap param = new TreeMap<>(Comparator.reverseOrder()); + param.put(3, 13); + param.put(4, 14); + param.put(5, 15); + param.put(6, 16); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(6, 16), IntPair.of(5, 15), + IntPair.of(4, 14)); + assertMap(map, expected); + } + + @Test + public void testReversedOrderPutAllSortedMapWithSameOrderingOverlappedKeyRange() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 4); + + map.put(3, 3); + map.put(1, 1); + + SortedMap param = new TreeMap<>(Comparator.reverseOrder()); + param.put(6, 16); + param.put(4, 14); + param.put(2, 12); + + map.putAll(param); + + List> expected = Lists.newArrayList(IntPair.of(6, 16), IntPair.of(4, 14), + IntPair.of(3, 3), IntPair.of(2, 12)); + assertMap(map, expected); + } + + @Test + public void testReversedOrderPutAllSortedMapWithDifferentOrdering() { + BoundedSortedMap map = new BoundedSortedMap( + Comparator.reverseOrder(), 4); + + map.put(3, 3); + map.put(1, 1); + + SortedMap param = new TreeMap<>(Comparator.naturalOrder()); + param.put(2, 12); + param.put(4, 14); + param.put(6, 16); + + map.putAll(param); + + // respecting map's ordering + List> expected = Lists.newArrayList(IntPair.of(6, 16), IntPair.of(4, 14), + IntPair.of(3, 3), IntPair.of(2, 12)); + assertMap(map, expected); + } + + private void assertMap(Map map, List> expectedEntities) { + Assert.assertEquals(expectedEntities.size(), map.size()); + Assert.assertEquals(expectedEntities.stream().map(Pair::getKey).collect(Collectors.toSet()), + map.keySet()); + + expectedEntities.forEach(entity -> Assert.assertEquals(entity.getValue(), + map.get(entity.getKey()))); + } - Assert.assertEquals(2, map.size()); - Assert.assertEquals(Integer.valueOf(3), map.firstKey()); - Assert.assertEquals(Integer.valueOf(2), map.lastKey()); + private static class IntPair { + static ImmutablePair of(Integer key, Integer value) { + return new ImmutablePair<>(key, value); + } } } From ee8b117d0ef2a389f1d08a262de80dd90009ec6e Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 5 Jul 2018 21:34:25 +0900 Subject: [PATCH 06/13] Rename field as suggested in review comment --- .../spark/sql/streaming/state/BoundedSortedMap.java | 3 +-- .../streaming/state/HDFSBackedStateStoreProvider.scala | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java index 86a04b6999a45..d0f6419364900 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java @@ -142,5 +142,4 @@ private void putAll0(SortedMap map) { super.put(entry.getKey(), entry.getValue()); } } - -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 5dff066856249..e8d56dc19ccce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -204,9 +204,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit this.valueSchema = valueSchema this.storeConf = storeConf this.hadoopConf = hadoopConf - this.numberOfVersionsRetainInMemory = storeConf.maxVersionsToRetainInMemory + this.numberOfVersionsToRetainInMemory = storeConf.maxVersionsToRetainInMemory this.loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse, - numberOfVersionsRetainInMemory) + numberOfVersionsToRetainInMemory) fm.mkdirs(baseDir) } @@ -245,9 +245,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var hadoopConf: Configuration = _ // taking default value first: this will be updated by init method with configuration - @volatile private var numberOfVersionsRetainInMemory: Int = 2 + @volatile private var numberOfVersionsToRetainInMemory: Int = 2 @volatile private var loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse, - numberOfVersionsRetainInMemory) + numberOfVersionsToRetainInMemory) private lazy val baseDir = stateStoreId.storeCheckpointLocation() private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) From 35892b5438b747b0542b04cee1efbe2e70951836 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 5 Jul 2018 22:29:24 +0900 Subject: [PATCH 07/13] Fix javadoc style check --- .../apache/spark/sql/streaming/state/BoundedSortedMap.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java index d0f6419364900..70085321fc9c1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java @@ -23,12 +23,12 @@ /** * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. - *

+ * * As TreeMap does, this implementation sorts elements in natural order, and cuts off * smaller elements to retain at most bigger N elements. - *

+ * * You can provide reversed order of comparator to retain smaller elements instead. - *

+ * * This class is not thread-safe, so synchronization would be needed to use this concurrently. * * @param key type From be44d9c6be7f267e1f00ea84d4ce9a80b9b6985c Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 11 Jul 2018 07:24:41 +0900 Subject: [PATCH 08/13] Move the core logic of BoundedSortedMap into HDFSBackedStateStoreProvider --- .../sql/streaming/state/BoundedSortedMap.java | 145 ------- .../state/HDFSBackedStateStoreProvider.scala | 43 +- .../state/BoundedSortedMapSuite.java | 400 ------------------ .../streaming/state/StateStoreSuite.scala | 62 ++- 4 files changed, 93 insertions(+), 557 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java delete mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java deleted file mode 100644 index 70085321fc9c1..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.streaming.state; - -import java.util.Comparator; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -/** - * This class implements bounded {@link java.util.SortedMap} based on {@link java.util.TreeMap}. - * - * As TreeMap does, this implementation sorts elements in natural order, and cuts off - * smaller elements to retain at most bigger N elements. - * - * You can provide reversed order of comparator to retain smaller elements instead. - * - * This class is not thread-safe, so synchronization would be needed to use this concurrently. - * - * @param key type - * @param value type - */ -public final class BoundedSortedMap extends TreeMap { - - private final int limit; - - /** - * Constructor - * - * @param comparator comparator instance to compare between keys - * @param limit bounded size - */ - public BoundedSortedMap(Comparator comparator, int limit) { - super(comparator); - this.limit = limit; - } - - @Override - public V put(K key, V value) { - // This method doesn't guarantee thread-safety, so it must be guarded with synchronization - if (size() > limit) { - throw new IllegalStateException("BoundedSortedMap is broken: already out of bound."); - } - - if (size() == limit) { - K lk = lastKey(); - int comp = comparator().compare(lk, key); - if (comp > 0) { - remove(lk); - return super.put(key, value); - } else if (comp == 0) { - // just overwrite it without explicitly removing - return super.put(key, value); - } - - // unchanged - return null; - } - - return super.put(key, value); - } - - @Override - public void putAll(Map map) { - if (size() > limit) { - throw new IllegalStateException("BoundedSortedMap is broken: already out of bound."); - } - - if (isApplicableOfPutAll0(map)) { - SortedMap paramMap = (SortedMap) map; - putAll0(paramMap); - } else { - // fail back to put all entries one by one - for (Map.Entry entry : map.entrySet()) { - put(entry.getKey(), entry.getValue()); - } - } - } - - private boolean isApplicableOfPutAll0(Map map) { - if (map instanceof SortedMap) { - SortedMap paramMap = (SortedMap) map; - - return (paramMap.comparator() == null && comparator() == null) || ( - paramMap.comparator() != null && paramMap.comparator().equals(comparator())); - } - - return false; - } - - private void putAll0(SortedMap map) { - // assuming isApplicableOfPutAll0 returns true, which means comparators in both maps - // guarantee same ordering of key - - // if first key of this map is bigger (if it's natural ordering) than last key of parameter, - // all of elements in this map will be evicted. - // clear the map and put parameter's entries until reaching limit. - K fk = firstKey(); - if (comparator().compare(fk, map.lastKey()) > 0) { - clear(); - for (Map.Entry entry : map.entrySet()) { - // safe to directly call super.put - super.put(entry.getKey(), entry.getValue()); - - if (size() == limit) { - break; - } - } - - return; - } - - for (Map.Entry entry : map.entrySet()) { - if (size() == limit) { - K lk = lastKey(); - - // if the map is reached the limit and last key of this map is smaller than first key of - // parameter map, no need to add remaining entries from parameter map. - if (comparator().compare(lk, entry.getKey()) < 0) { - return; - } else { - // remove last key to ensure free space before putting new entity - remove(lk); - } - } - - // safe to directly call super.put - super.put(entry.getKey(), entry.getValue()); - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index e8d56dc19ccce..b55cc7d97e868 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.util.control.NonFatal import com.google.common.io.ByteStreams +import java.util import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ @@ -35,7 +36,6 @@ import org.apache.spark.io.LZ4CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream -import org.apache.spark.sql.streaming.state.BoundedSortedMap import org.apache.spark.sql.types.StructType import org.apache.spark.util.{SizeEstimator, Utils} @@ -205,8 +205,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit this.storeConf = storeConf this.hadoopConf = hadoopConf this.numberOfVersionsToRetainInMemory = storeConf.maxVersionsToRetainInMemory - this.loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse, - numberOfVersionsToRetainInMemory) fm.mkdirs(baseDir) } @@ -243,12 +241,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit @volatile private var valueSchema: StructType = _ @volatile private var storeConf: StateStoreConf = _ @volatile private var hadoopConf: Configuration = _ + @volatile private var numberOfVersionsToRetainInMemory: Int = _ - // taking default value first: this will be updated by init method with configuration - @volatile private var numberOfVersionsToRetainInMemory: Int = 2 - @volatile private var loadedMaps = new BoundedSortedMap[Long, MapType](Ordering[Long].reverse, - numberOfVersionsToRetainInMemory) - + private lazy val loadedMaps = new util.TreeMap[Long, MapType](Ordering[Long].reverse) private lazy val baseDir = stateStoreId.storeCheckpointLocation() private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) @@ -258,7 +253,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = { synchronized { finalizeDeltaFile(output) - loadedMaps.put(newVersion, map) + putStateIntoStateCache(newVersion, map) } } @@ -278,6 +273,32 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } else Iterator.empty } + /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */ + private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { + // shallow copy as a minimal guard + loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]] + } + + private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized { + while (loadedMaps.size() > numberOfVersionsToRetainInMemory) { + loadedMaps.remove(loadedMaps.lastKey()) + } + + val size = loadedMaps.size() + if (size == numberOfVersionsToRetainInMemory) { + val versionIdForLastKey = loadedMaps.lastKey() + if (versionIdForLastKey > newVersion) { + // this is the only case which put doesn't need + return + } else if (versionIdForLastKey < newVersion) { + // this case needs removal of the last key before putting new one + loadedMaps.remove(versionIdForLastKey) + } + } + + loadedMaps.put(newVersion, map) + } + /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { @@ -294,7 +315,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val (result, elapsedMs) = Utils.timeTakenMs { val snapshotCurrentVersionMap = readSnapshotFile(version) if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } + synchronized { putStateIntoStateCache(version, snapshotCurrentVersionMap.get) } return snapshotCurrentVersionMap.get } @@ -322,7 +343,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit updateFromDeltaFile(deltaVersion, resultMap) } - synchronized { loadedMaps.put(version, resultMap) } + synchronized { putStateIntoStateCache(version, resultMap) } resultMap } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java deleted file mode 100644 index 035542af62bd8..0000000000000 --- a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/state/BoundedSortedMapSuite.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package test.org.apache.spark.sql.streaming.state; - -import com.google.common.collect.Lists; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.spark.sql.streaming.state.BoundedSortedMap; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.stream.Collectors; - -public class BoundedSortedMapSuite { - - @Test - public void testAddElementBelowBoundedCount() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 2); - - map.put(2, 2); - map.put(1, 1); - - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2)); - assertMap(map, expected); - } - - @Test - public void testAddSmallestElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 2); - - map.put(2, 2); - map.put(1, 1); - - map.put(0, 0); - - List> expected = Lists.newArrayList(IntPair.of(0, 0), IntPair.of(1, 1)); - assertMap(map, expected); - } - - @Test - public void testAddMiddleOfElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 2); - - map.put(3, 3); - map.put(1, 1); - - // 3 is being cut off - map.put(2, 2); - - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2)); - assertMap(map, expected); - } - - @Test - public void testAddBiggestOfElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 2); - - map.put(2, 2); - map.put(1, 1); - - // 3 is not added - map.put(3, 3); - - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2)); - assertMap(map, expected); - } - - @Test - public void testOverwriteBiggestOfElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 2); - - map.put(2, 2); - map.put(1, 1); - - map.put(2, 3); - - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 3)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderAddElementBelowBoundedCount() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 2); - - map.put(2, 2); - map.put(1, 1); - - List> expected = Lists.newArrayList(IntPair.of(2, 2), IntPair.of(1, 1)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderAddBiggestElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 2); - - map.put(2, 2); - map.put(1, 1); - - // 1 is being cut off - map.put(3, 3); - - List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 2)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderAddMiddleOfElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 2); - - map.put(3, 3); - map.put(1, 1); - - // 1 is being cut off - map.put(2, 2); - - List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 2)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderAddSmallestOfElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 2); - - map.put(3, 3); - map.put(2, 2); - - // 1 is not added - map.put(1, 1); - - List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 2)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderOverwriteSmallestOfElementToFullOfMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 2); - - map.put(3, 3); - map.put(2, 2); - - map.put(2, 3); - - List> expected = Lists.newArrayList(IntPair.of(3, 3), IntPair.of(2, 3)); - assertMap(map, expected); - } - - @Test - public void testPutAllNonSortedMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 3); - - map.put(1, 1); - map.put(2, 2); - - Map param = new HashMap<>(); - param.put(2, 12); - param.put(1, 11); - param.put(3, 13); - param.put(4, 14); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(1, 11), IntPair.of(2, 12), - IntPair.of(3, 13)); - assertMap(map, expected); - } - - @Test - public void testPutAllSortedMapWithSameOrderingAllKeysAreSmallerThanFirstKey() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 3); - - map.put(7, 7); - map.put(8, 8); - - SortedMap param = new TreeMap<>(Comparator.naturalOrder()); - param.put(1, 11); - param.put(2, 12); - param.put(3, 13); - param.put(4, 14); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(1, 11), IntPair.of(2, 12), - IntPair.of(3, 13)); - assertMap(map, expected); - } - - @Test - public void testPutAllSortedMapWithSameOrderingAllKeysAreBiggerThanLastKey() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 3); - - map.put(1, 1); - map.put(2, 2); - - SortedMap param = new TreeMap<>(Comparator.naturalOrder()); - param.put(3, 13); - param.put(4, 14); - param.put(5, 15); - param.put(6, 16); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 2), - IntPair.of(3, 13)); - assertMap(map, expected); - } - - @Test - public void testPutAllSortedMapWithSameOrderingOverlappedKeyRange() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 4); - - map.put(1, 1); - map.put(3, 3); - - SortedMap param = new TreeMap<>(Comparator.naturalOrder()); - param.put(2, 12); - param.put(4, 14); - param.put(6, 16); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 12), - IntPair.of(3, 3), IntPair.of(4, 14)); - assertMap(map, expected); - } - - @Test - public void testPutAllSortedMapWithDifferentOrdering() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.naturalOrder(), 4); - - map.put(1, 1); - map.put(3, 3); - - SortedMap param = new TreeMap<>(Comparator.reverseOrder()); - param.put(6, 16); - param.put(4, 14); - param.put(2, 12); - - map.putAll(param); - - // respecting map's ordering - List> expected = Lists.newArrayList(IntPair.of(1, 1), IntPair.of(2, 12), - IntPair.of(3, 3), IntPair.of(4, 14)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderPutAllNonSortedMap() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 3); - - map.put(2, 2); - map.put(1, 1); - - Map param = new HashMap<>(); - param.put(2, 12); - param.put(1, 11); - param.put(3, 13); - param.put(4, 14); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(4, 14), IntPair.of(3, 13), - IntPair.of(2, 12)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderPutAllSortedMapWithSameOrderingAllKeysAreSmallerThanLastKey() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 3); - - map.put(9, 9); - map.put(8, 8); - map.put(7, 7); - - SortedMap param = new TreeMap<>(Comparator.reverseOrder()); - param.put(1, 11); - param.put(2, 12); - param.put(3, 13); - param.put(4, 14); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(9, 9), IntPair.of(8, 8), - IntPair.of(7, 7)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderPutAllSortedMapWithSameOrderingAllKeysAreBiggerThanFirstKey() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 3); - - map.put(2, 2); - map.put(1, 1); - - SortedMap param = new TreeMap<>(Comparator.reverseOrder()); - param.put(3, 13); - param.put(4, 14); - param.put(5, 15); - param.put(6, 16); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(6, 16), IntPair.of(5, 15), - IntPair.of(4, 14)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderPutAllSortedMapWithSameOrderingOverlappedKeyRange() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 4); - - map.put(3, 3); - map.put(1, 1); - - SortedMap param = new TreeMap<>(Comparator.reverseOrder()); - param.put(6, 16); - param.put(4, 14); - param.put(2, 12); - - map.putAll(param); - - List> expected = Lists.newArrayList(IntPair.of(6, 16), IntPair.of(4, 14), - IntPair.of(3, 3), IntPair.of(2, 12)); - assertMap(map, expected); - } - - @Test - public void testReversedOrderPutAllSortedMapWithDifferentOrdering() { - BoundedSortedMap map = new BoundedSortedMap( - Comparator.reverseOrder(), 4); - - map.put(3, 3); - map.put(1, 1); - - SortedMap param = new TreeMap<>(Comparator.naturalOrder()); - param.put(2, 12); - param.put(4, 14); - param.put(6, 16); - - map.putAll(param); - - // respecting map's ordering - List> expected = Lists.newArrayList(IntPair.of(6, 16), IntPair.of(4, 14), - IntPair.of(3, 3), IntPair.of(2, 12)); - assertMap(map, expected); - } - - private void assertMap(Map map, List> expectedEntities) { - Assert.assertEquals(expectedEntities.size(), map.size()); - Assert.assertEquals(expectedEntities.stream().map(Pair::getKey).collect(Collectors.toSet()), - map.keySet()); - - expectedEntities.forEach(entity -> Assert.assertEquals(entity.getValue(), - map.get(entity.getKey()))); - } - - private static class IntPair { - static ImmutablePair of(Integer key, Integer value) { - return new ImmutablePair<>(key, value); - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 73f8705060402..3bf1322b978a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -25,14 +25,15 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import java.util import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ - import org.apache.spark._ + import org.apache.spark.LocalSparkContext._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} @@ -64,6 +65,63 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } + test("retaining only latest configured size of versions in memory") { + val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 3) + + var currentVersion = 0 + def updateVersionTo(targetVersion: Int): Unit = { + for (i <- currentVersion + 1 to targetVersion) { + val store = provider.getStore(currentVersion) + put(store, "a", i) + store.commit() + currentVersion += 1 + } + require(currentVersion === targetVersion) + } + + def restoreOriginValues(map: provider.MapType): Map[String, Int] = { + map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap + } + + updateVersionTo(1) + assert(getData(provider) === Set("a" -> 1)) + var loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 1) + assert(loadedMaps.firstKey() === 1L) + assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) + + updateVersionTo(2) + assert(getData(provider) === Set("a" -> 2)) + loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 2) + assert(loadedMaps.firstKey() === 2L) + assert(loadedMaps.lastKey() === 1L) + assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) + + updateVersionTo(3) + assert(getData(provider) === Set("a" -> 3)) + loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 3) + assert(loadedMaps.firstKey() === 3L) + assert(loadedMaps.lastKey() === 1L) + assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) + assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) + + // this trigger exceeding cache and 1 will be evicted + updateVersionTo(4) + assert(getData(provider) === Set("a" -> 4)) + loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 3) + assert(loadedMaps.firstKey() === 4L) + assert(loadedMaps.lastKey() === 2L) + assert(restoreOriginValues(loadedMaps.get(4L)) === Map("a" -> 4)) + assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) + assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + } + test("snapshotting") { val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5) @@ -535,9 +593,11 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] partition: Int, dir: String = newDir(), minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, + numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get, hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = { val sqlConf = new SQLConf() sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) + sqlConf.setConf(SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY, numOfVersToRetainInMemory) sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) val provider = new HDFSBackedStateStoreProvider() provider.init( From 30fac38fc822c56583ef5cadfa293eba274da400 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 11 Jul 2018 08:09:07 +0900 Subject: [PATCH 09/13] Fix scala style --- .../streaming/state/HDFSBackedStateStoreProvider.scala | 2 +- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index b55cc7d97e868..a41c439f5c05b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ +import java.util import java.util.Locale import scala.collection.JavaConverters._ @@ -25,7 +26,6 @@ import scala.collection.mutable import scala.util.control.NonFatal import com.google.common.io.ByteStreams -import java.util import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 3bf1322b978a5..e3640fb31433f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -25,15 +25,14 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random -import java.util import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark._ +import org.apache.spark._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} From e95e45a7c0b31749b10de4332be2457f82345afb Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 12 Jul 2018 07:58:18 +0900 Subject: [PATCH 10/13] Add more test cases on MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2/1/0, and fix a silly bug --- .../state/HDFSBackedStateStoreProvider.scala | 5 + .../streaming/state/StateStoreSuite.scala | 120 ++++++++++++------ 2 files changed, 87 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index a41c439f5c05b..b174f4ab68149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -280,6 +280,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized { + if (numberOfVersionsToRetainInMemory <= 0) { + if (loadedMaps.size() > 0) loadedMaps.clear() + return + } + while (loadedMaps.size() > numberOfVersionsToRetainInMemory) { loadedMaps.remove(loadedMaps.lastKey()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index e3640fb31433f..2c449142c2460 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -64,33 +64,36 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } - test("retaining only latest configured size of versions in memory") { - val provider = newStoreProvider(opId = Random.nextInt, partition = 0, - numOfVersToRetainInMemory = 3) - - var currentVersion = 0 - def updateVersionTo(targetVersion: Int): Unit = { - for (i <- currentVersion + 1 to targetVersion) { - val store = provider.getStore(currentVersion) - put(store, "a", i) - store.commit() - currentVersion += 1 - } - require(currentVersion === targetVersion) + def updateVersionTo(provider: StateStoreProvider, currentVersion: => Int, + targetVersion: Int): Int = { + var newCurrentVersion = currentVersion + for (i <- newCurrentVersion + 1 to targetVersion) { + val store = provider.getStore(newCurrentVersion) + put(store, "a", i) + store.commit() + newCurrentVersion += 1 } + require(newCurrentVersion === targetVersion) + newCurrentVersion + } + + test("retaining only two latest versions when MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") { + val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 2) def restoreOriginValues(map: provider.MapType): Map[String, Int] = { map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap } - updateVersionTo(1) + var currentVersion = 0 + currentVersion = updateVersionTo(provider, currentVersion, 1) assert(getData(provider) === Set("a" -> 1)) var loadedMaps = provider.getClonedLoadedMaps() assert(loadedMaps.size() === 1) assert(loadedMaps.firstKey() === 1L) assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) - updateVersionTo(2) + currentVersion = updateVersionTo(provider, currentVersion, 2) assert(getData(provider) === Set("a" -> 2)) loadedMaps = provider.getClonedLoadedMaps() assert(loadedMaps.size() === 2) @@ -99,43 +102,84 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) - updateVersionTo(3) + // this trigger exceeding cache and 1 will be evicted + currentVersion = updateVersionTo(provider, currentVersion, 3) assert(getData(provider) === Set("a" -> 3)) loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 3) + assert(loadedMaps.size() === 2) assert(loadedMaps.firstKey() === 3L) - assert(loadedMaps.lastKey() === 1L) + assert(loadedMaps.lastKey() === 2L) assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + } + + test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { + val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 1) + + var currentVersion = 0 + + def restoreOriginValues(map: provider.MapType): Map[String, Int] = { + map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap + } + + currentVersion = updateVersionTo(provider, currentVersion, 1) + assert(getData(provider) === Set("a" -> 1)) + var loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 1) + assert(loadedMaps.firstKey() === 1L) assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) - // this trigger exceeding cache and 1 will be evicted - updateVersionTo(4) - assert(getData(provider) === Set("a" -> 4)) + currentVersion = updateVersionTo(provider, currentVersion, 2) + assert(getData(provider) === Set("a" -> 2)) loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 3) - assert(loadedMaps.firstKey() === 4L) - assert(loadedMaps.lastKey() === 2L) - assert(restoreOriginValues(loadedMaps.get(4L)) === Map("a" -> 4)) - assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) + // now version 1 is evicted and not stored in cache + // this fact ensures cache miss will occur when this partition succeeds commit + // but there's a failure afterwards so have to reprocess previous batch + assert(loadedMaps.size() === 1) + assert(loadedMaps.firstKey() === 2L) assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + + // suppose there has been failure after committing, and it decided to reprocess previous batch + currentVersion = 1 + + val store = provider.getStore(currentVersion) + // negative value to represent reprocessing + put(store, "a", -2) + store.commit() + currentVersion += 1 + + // make sure newly committed version is reflected to the cache (overwritten) + assert(getData(provider) === Set("a" -> -2)) + loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 1) + assert(loadedMaps.firstKey() === 2L) + assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> -2)) + } + + test("no cache data with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 0") { + val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 0) + + var currentVersion = 0 + + currentVersion = updateVersionTo(provider, currentVersion, 1) + assert(getData(provider) === Set("a" -> 1)) + var loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 0) + + currentVersion = updateVersionTo(provider, currentVersion, 2) + assert(getData(provider) === Set("a" -> 2)) + loadedMaps = provider.getClonedLoadedMaps() + assert(loadedMaps.size() === 0) } test("snapshotting") { val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5) var currentVersion = 0 - def updateVersionTo(targetVersion: Int): Unit = { - for (i <- currentVersion + 1 to targetVersion) { - val store = provider.getStore(currentVersion) - put(store, "a", i) - store.commit() - currentVersion += 1 - } - require(currentVersion === targetVersion) - } - updateVersionTo(2) + currentVersion = updateVersionTo(provider, currentVersion, 2) require(getData(provider) === Set("a" -> 2)) provider.doMaintenance() // should not generate snapshot files assert(getData(provider) === Set("a" -> 2)) @@ -146,7 +190,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } // After version 6, snapshotting should generate one snapshot file - updateVersionTo(6) + currentVersion = updateVersionTo(provider, currentVersion, 6) require(getData(provider) === Set("a" -> 6), "store not updated correctly") provider.doMaintenance() // should generate snapshot files @@ -161,7 +205,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] "snapshotting messed up the data of the final version") // After version 20, snapshotting should generate newer snapshot files - updateVersionTo(20) + currentVersion = updateVersionTo(provider, currentVersion, 20) require(getData(provider) === Set("a" -> 20), "store not updated correctly") provider.doMaintenance() // do snapshot From f07ad0466190a02c40669e300f66f3170a1c2543 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 17 Jul 2018 18:13:09 +0900 Subject: [PATCH 11/13] Address review comments from @tdas --- .../state/HDFSBackedStateStoreProvider.scala | 11 +- .../streaming/state/StateStoreSuite.scala | 123 +++++++++++------- 2 files changed, 79 insertions(+), 55 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index b174f4ab68149..a350afd1952cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -253,7 +253,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = { synchronized { finalizeDeltaFile(output) - putStateIntoStateCache(newVersion, map) + putStateIntoStateCacheMap(newVersion, map) } } @@ -279,7 +279,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]] } - private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized { + private def putStateIntoStateCacheMap(newVersion: Long, map: MapType): Unit = synchronized { if (numberOfVersionsToRetainInMemory <= 0) { if (loadedMaps.size() > 0) loadedMaps.clear() return @@ -293,7 +293,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (size == numberOfVersionsToRetainInMemory) { val versionIdForLastKey = loadedMaps.lastKey() if (versionIdForLastKey > newVersion) { - // this is the only case which put doesn't need + // this is the only case which we can avoid putting, because new version will be placed to + // the last key and it should be evicted right away return } else if (versionIdForLastKey < newVersion) { // this case needs removal of the last key before putting new one @@ -320,7 +321,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val (result, elapsedMs) = Utils.timeTakenMs { val snapshotCurrentVersionMap = readSnapshotFile(version) if (snapshotCurrentVersionMap.isDefined) { - synchronized { putStateIntoStateCache(version, snapshotCurrentVersionMap.get) } + synchronized { putStateIntoStateCacheMap(version, snapshotCurrentVersionMap.get) } return snapshotCurrentVersionMap.get } @@ -348,7 +349,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit updateFromDeltaFile(deltaVersion, resultMap) } - synchronized { putStateIntoStateCache(version, resultMap) } + synchronized { putStateIntoStateCacheMap(version, resultMap) } resultMap } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 2c449142c2460..d950038a5103f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -25,14 +25,15 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import java.util import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ - import org.apache.spark._ + import org.apache.spark.LocalSparkContext._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} @@ -47,6 +48,7 @@ import org.apache.spark.util.Utils class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] with BeforeAndAfter with PrivateMethodTester { type MapType = mutable.HashMap[UnsafeRow, UnsafeRow] + type ProviderMapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow] import StateStoreCoordinatorSuite._ import StateStoreTestsHelper._ @@ -64,53 +66,76 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] require(!StateStore.isMaintenanceRunning) } - def updateVersionTo(provider: StateStoreProvider, currentVersion: => Int, - targetVersion: Int): Int = { + def updateVersionTo( + provider: StateStoreProvider, + currentVersion: Int, + targetVersion: Int): Int = { var newCurrentVersion = currentVersion - for (i <- newCurrentVersion + 1 to targetVersion) { - val store = provider.getStore(newCurrentVersion) - put(store, "a", i) - store.commit() - newCurrentVersion += 1 + for (i <- newCurrentVersion until targetVersion) { + newCurrentVersion = incrementVersion(provider, i) } require(newCurrentVersion === targetVersion) newCurrentVersion } + def incrementVersion(provider: StateStoreProvider, currentVersion: Int): Int = { + val store = provider.getStore(currentVersion) + put(store, "a", currentVersion + 1) + store.commit() + currentVersion + 1 + } + + def checkLoadedVersions( + loadedMaps: util.SortedMap[Long, ProviderMapType], + count: Int, + earliestKey: Long, + latestKey: Long): Unit = { + assert(loadedMaps.size() === count) + assert(loadedMaps.firstKey() === earliestKey) + assert(loadedMaps.lastKey() === latestKey) + } + + def checkVersion( + loadedMaps: util.SortedMap[Long, ProviderMapType], + version: Long, + expectedData: Map[String, Int]): Unit = { + + val originValueMap = loadedMaps.get(version).asScala.map { entry => + rowToString(entry._1) -> rowToInt(entry._2) + }.toMap + + assert(originValueMap === expectedData) + } + test("retaining only two latest versions when MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") { val provider = newStoreProvider(opId = Random.nextInt, partition = 0, numOfVersToRetainInMemory = 2) - def restoreOriginValues(map: provider.MapType): Map[String, Int] = { - map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap - } - var currentVersion = 0 - currentVersion = updateVersionTo(provider, currentVersion, 1) + + // commit the ver 1 : cache will have one element + currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 1)) var loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 1) - assert(loadedMaps.firstKey() === 1L) - assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) + checkLoadedVersions(loadedMaps, 1, 1L, 1L) + checkVersion(loadedMaps, 1L, Map("a" -> 1)) - currentVersion = updateVersionTo(provider, currentVersion, 2) + // commit the ver 2 : cache will have two elements + currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 2)) loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 2) - assert(loadedMaps.firstKey() === 2L) - assert(loadedMaps.lastKey() === 1L) - assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) - assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) - - // this trigger exceeding cache and 1 will be evicted - currentVersion = updateVersionTo(provider, currentVersion, 3) + checkLoadedVersions(loadedMaps, 2, 2L, 1L) + checkVersion(loadedMaps, 2L, Map("a" -> 2)) + checkVersion(loadedMaps, 1L, Map("a" -> 1)) + + // commit the ver 3 : cache has already two elements and adding ver 3 incurs exceeding cache, + // and ver 3 will be added but ver 1 will be evicted + currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 3)) loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 2) - assert(loadedMaps.firstKey() === 3L) - assert(loadedMaps.lastKey() === 2L) - assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) - assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + checkLoadedVersions(loadedMaps, 2, 3L, 2L) + checkVersion(loadedMaps, 3L, Map("a" -> 3)) + checkVersion(loadedMaps, 2L, Map("a" -> 2)) } test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { @@ -119,30 +144,27 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] var currentVersion = 0 - def restoreOriginValues(map: provider.MapType): Map[String, Int] = { - map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap - } - - currentVersion = updateVersionTo(provider, currentVersion, 1) + // commit the ver 1 : cache will have one element + currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 1)) var loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 1) - assert(loadedMaps.firstKey() === 1L) - assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) + checkLoadedVersions(loadedMaps, 1, 1L, 1L) + checkVersion(loadedMaps, 1L, Map("a" -> 1)) - currentVersion = updateVersionTo(provider, currentVersion, 2) - assert(getData(provider) === Set("a" -> 2)) - loadedMaps = provider.getClonedLoadedMaps() - // now version 1 is evicted and not stored in cache + // commit the ver 2 : cache has already one elements and adding ver 2 incurs exceeding cache, + // and ver 2 will be added but ver 1 will be evicted // this fact ensures cache miss will occur when this partition succeeds commit // but there's a failure afterwards so have to reprocess previous batch - assert(loadedMaps.size() === 1) - assert(loadedMaps.firstKey() === 2L) - assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + currentVersion = incrementVersion(provider, currentVersion) + assert(getData(provider) === Set("a" -> 2)) + loadedMaps = provider.getClonedLoadedMaps() + checkLoadedVersions(loadedMaps, 1, 2L, 2L) + checkVersion(loadedMaps, 2L, Map("a" -> 2)) // suppose there has been failure after committing, and it decided to reprocess previous batch currentVersion = 1 + // committing to existing version which is committed partially but abandoned globally val store = provider.getStore(currentVersion) // negative value to represent reprocessing put(store, "a", -2) @@ -152,9 +174,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // make sure newly committed version is reflected to the cache (overwritten) assert(getData(provider) === Set("a" -> -2)) loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 1) - assert(loadedMaps.firstKey() === 2L) - assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> -2)) + checkLoadedVersions(loadedMaps, 1, 2L, 2L) + checkVersion(loadedMaps, 2L, Map("a" -> -2)) } test("no cache data with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 0") { @@ -163,12 +184,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] var currentVersion = 0 - currentVersion = updateVersionTo(provider, currentVersion, 1) + // commit the ver 1 : never cached + currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 1)) var loadedMaps = provider.getClonedLoadedMaps() assert(loadedMaps.size() === 0) - currentVersion = updateVersionTo(provider, currentVersion, 2) + // commit the ver 2 : never cached + currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 2)) loadedMaps = provider.getClonedLoadedMaps() assert(loadedMaps.size() === 0) From 02b49724b904ef7314968a4bd1f83e2b86cfc3ee Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 17 Jul 2018 18:16:25 +0900 Subject: [PATCH 12/13] Fix scala style (mostly import) --- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index d950038a5103f..c6f6bf0f02018 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -19,21 +19,21 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{File, IOException} import java.net.URI +import java.util import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random -import java.util import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark._ +import org.apache.spark._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} From cf78a2a25791a683c0ee36b08bdc79edd54f212a Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 19 Jul 2018 10:23:35 +0900 Subject: [PATCH 13/13] Address review comments from @tdas --- .../state/HDFSBackedStateStoreProvider.scala | 2 +- .../streaming/state/StateStoreSuite.scala | 44 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index a350afd1952cc..523acef34ca61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -274,7 +274,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */ - private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { + private[state] def getLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { // shallow copy as a minimal guard loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index c6f6bf0f02018..bfeb2b16ff7be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -116,26 +116,26 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // commit the ver 1 : cache will have one element currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 1)) - var loadedMaps = provider.getClonedLoadedMaps() - checkLoadedVersions(loadedMaps, 1, 1L, 1L) - checkVersion(loadedMaps, 1L, Map("a" -> 1)) + var loadedMaps = provider.getLoadedMaps() + checkLoadedVersions(loadedMaps, count = 1, earliestKey = 1, latestKey = 1) + checkVersion(loadedMaps, 1, Map("a" -> 1)) // commit the ver 2 : cache will have two elements currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 2)) - loadedMaps = provider.getClonedLoadedMaps() - checkLoadedVersions(loadedMaps, 2, 2L, 1L) - checkVersion(loadedMaps, 2L, Map("a" -> 2)) - checkVersion(loadedMaps, 1L, Map("a" -> 1)) + loadedMaps = provider.getLoadedMaps() + checkLoadedVersions(loadedMaps, count = 2, earliestKey = 2, latestKey = 1) + checkVersion(loadedMaps, 2, Map("a" -> 2)) + checkVersion(loadedMaps, 1, Map("a" -> 1)) // commit the ver 3 : cache has already two elements and adding ver 3 incurs exceeding cache, // and ver 3 will be added but ver 1 will be evicted currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 3)) - loadedMaps = provider.getClonedLoadedMaps() - checkLoadedVersions(loadedMaps, 2, 3L, 2L) - checkVersion(loadedMaps, 3L, Map("a" -> 3)) - checkVersion(loadedMaps, 2L, Map("a" -> 2)) + loadedMaps = provider.getLoadedMaps() + checkLoadedVersions(loadedMaps, count = 2, earliestKey = 3, latestKey = 2) + checkVersion(loadedMaps, 3, Map("a" -> 3)) + checkVersion(loadedMaps, 2, Map("a" -> 2)) } test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { @@ -147,9 +147,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // commit the ver 1 : cache will have one element currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 1)) - var loadedMaps = provider.getClonedLoadedMaps() - checkLoadedVersions(loadedMaps, 1, 1L, 1L) - checkVersion(loadedMaps, 1L, Map("a" -> 1)) + var loadedMaps = provider.getLoadedMaps() + checkLoadedVersions(loadedMaps, count = 1, earliestKey = 1, latestKey = 1) + checkVersion(loadedMaps, 1, Map("a" -> 1)) // commit the ver 2 : cache has already one elements and adding ver 2 incurs exceeding cache, // and ver 2 will be added but ver 1 will be evicted @@ -157,9 +157,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // but there's a failure afterwards so have to reprocess previous batch currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 2)) - loadedMaps = provider.getClonedLoadedMaps() - checkLoadedVersions(loadedMaps, 1, 2L, 2L) - checkVersion(loadedMaps, 2L, Map("a" -> 2)) + loadedMaps = provider.getLoadedMaps() + checkLoadedVersions(loadedMaps, count = 1, earliestKey = 2, latestKey = 2) + checkVersion(loadedMaps, 2, Map("a" -> 2)) // suppose there has been failure after committing, and it decided to reprocess previous batch currentVersion = 1 @@ -173,9 +173,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // make sure newly committed version is reflected to the cache (overwritten) assert(getData(provider) === Set("a" -> -2)) - loadedMaps = provider.getClonedLoadedMaps() - checkLoadedVersions(loadedMaps, 1, 2L, 2L) - checkVersion(loadedMaps, 2L, Map("a" -> -2)) + loadedMaps = provider.getLoadedMaps() + checkLoadedVersions(loadedMaps, count = 1, earliestKey = 2, latestKey = 2) + checkVersion(loadedMaps, 2, Map("a" -> -2)) } test("no cache data with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 0") { @@ -187,13 +187,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // commit the ver 1 : never cached currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 1)) - var loadedMaps = provider.getClonedLoadedMaps() + var loadedMaps = provider.getLoadedMaps() assert(loadedMaps.size() === 0) // commit the ver 2 : never cached currentVersion = incrementVersion(provider, currentVersion) assert(getData(provider) === Set("a" -> 2)) - loadedMaps = provider.getClonedLoadedMaps() + loadedMaps = provider.getLoadedMaps() assert(loadedMaps.size() === 0) }