From cba55b7ef0445134a3c55cacafadd1259db04b0a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 16 Sep 2019 23:49:46 +0900 Subject: [PATCH 1/7] [SPARK-29111][CORE] Support snapshot/restore on KVStore --- .../spark/util/kvstore/InMemoryStore.java | 21 ++- .../apache/spark/util/kvstore/KVStore.java | 11 ++ .../util/kvstore/KVStoreSnapshotter.java | 145 ++++++++++++++++++ .../apache/spark/util/kvstore/LevelDB.java | 13 ++ .../spark/util/kvstore/ArrayKeyIndexType.java | 1 - .../util/kvstore/InMemoryIteratorSuite.java | 2 +- .../util/kvstore/InMemoryStoreSuite.java | 50 ++++++ .../apache/spark/util/kvstore/IntKeyType.java | 44 ++++++ ...orSuite.java => KVStoreIteratorSuite.java} | 6 +- .../util/kvstore/KVStoreSnapshotterSuite.java | 119 ++++++++++++++ .../spark/util/kvstore/KVStoreSuite.java | 30 ++++ .../util/kvstore/LevelDBIteratorSuite.java | 2 +- .../spark/util/kvstore/LevelDBSuite.java | 32 +--- 13 files changed, 436 insertions(+), 40 deletions(-) create mode 100644 common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java create mode 100644 common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java rename common/kvstore/src/test/java/org/apache/spark/util/kvstore/{DBIteratorSuite.java => KVStoreIteratorSuite.java} (98%) create mode 100644 common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java create mode 100644 common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index b33c53871c32f..231dafd70422c 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -17,13 +17,7 @@ package org.apache.spark.util.kvstore; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.HashSet; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; @@ -106,6 +100,19 @@ public KVStoreView view(Class type){ return list != null ? list.view() : emptyView(); } + @Override + public Class metadataType() { + if (metadata == null) { + return null; + } + return metadata.getClass(); + } + + @Override + public Set> types() { + return inMemoryLists.data.keySet(); + } + @Override public void close() { metadata = null; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java index ac159eb43182f..c7f2872fd31a1 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.util.Collection; +import java.util.Set; import org.apache.spark.annotation.Private; @@ -117,6 +118,16 @@ public interface KVStore extends Closeable { */ KVStoreView view(Class type) throws Exception; + /** + * Returns a type of app-specific metadata from the store, or null if it's not currently set. + */ + Class metadataType(); + + /** + * Returns all available types of all objects. + */ + Set> types(); + /** * Returns the number of items of the given type currently in the store. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java new file mode 100644 index 0000000000000..a6a5721364c80 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import com.google.common.io.ByteStreams; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.Set; + +public class KVStoreSnapshotter { + private static final int MARKER_END_OF_TYPE = -2; + private static final int MARKER_END_OF_FILE = -1; + + private final KVStoreSerializer serializer; + + public KVStoreSnapshotter(KVStoreSerializer serializer) { + this.serializer = serializer; + } + + public void dump(KVStore store, File snapshotFile) throws Exception { + DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile)); + + // store metadata if it exists + Class metadataType = store.metadataType(); + if (metadataType != null) { + writeClassName(metadataType, output); + Object metadata = store.getMetadata(metadataType); + writeObject(metadata, output); + writeEndOfType(output); + } else { + writeEndOfType(output); + } + + Set> types = store.types(); + for (Class clazz : types) { + writeClassName(clazz, output); + + KVStoreView view = store.view(clazz); + for (Object obj : view) { + writeObject(obj, output); + } + + writeEndOfType(output); + } + + writeEndOfFile(output); + output.close(); + } + + public void restore(File snapshotFile, KVStore store) throws Exception { + DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile)); + + // first one would be metadata + int metadataClazzLen = input.readInt(); + if (metadataClazzLen > 0) { + Class metadataClazz = readClassName(input, metadataClazzLen); + // metadata presented + int objLen = input.readInt(); + Object metadata = readObj(input, metadataClazz, objLen); + store.setMetadata(metadata); + + // additionally read -2 as end of type + consumeEndOfType(input); + } + + boolean eof = false; + while (!eof) { + int typeClazzNameLen = input.readInt(); + if (typeClazzNameLen == MARKER_END_OF_FILE) { + eof = true; + } else { + Class typeClazz = readClassName(input, typeClazzNameLen); + boolean eot = false; + while (!eot) { + int objLen = input.readInt(); + if (objLen == MARKER_END_OF_TYPE) { + eot = true; + } else { + Object obj = readObj(input, typeClazz, objLen); + store.write(obj); + } + } + } + } + + input.close(); + } + + private void writeClassName(Class clazz, DataOutputStream output) throws IOException { + byte[] clazzName = clazz.getCanonicalName().getBytes(StandardCharsets.UTF_8); + output.writeInt(clazzName.length); + output.write(clazzName); + } + + private void writeObject(Object obj, DataOutputStream output) throws Exception { + byte[] ser = serializer.serialize(obj); + output.writeInt(ser.length); + output.write(ser); + } + + private void writeEndOfType(DataOutputStream output) throws IOException { + output.writeInt(MARKER_END_OF_TYPE); + } + + private void writeEndOfFile(DataOutputStream output) throws IOException { + output.writeInt(MARKER_END_OF_FILE); + } + + private Class readClassName(DataInputStream input, int classNameLen) throws IOException, ClassNotFoundException { + byte[] classNameBuffer = new byte[classNameLen]; + ByteStreams.readFully(input, classNameBuffer, 0, classNameLen); + String className = new String(classNameBuffer, StandardCharsets.UTF_8); + return Class.forName(className); + } + + private Object readObj(DataInputStream input, Class clazz, int objLen) throws Exception { + byte[] objBuffer = new byte[objLen]; + ByteStreams.readFully(input, objBuffer, 0, objLen); + return serializer.deserialize(objBuffer, clazz); + } + + private void consumeEndOfType(DataInputStream input) throws IOException { + int eotCode = input.readInt(); + if (eotCode != MARKER_END_OF_TYPE) { + throw new IllegalStateException("The notion of 'end of type' is expected here, but got " + + eotCode + " instead"); + } + } +} diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 2ca4b0b2cb9f9..935a7a1f24629 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -63,6 +63,7 @@ public class LevelDB implements KVStore { */ private final ConcurrentMap typeAliases; private final ConcurrentMap, LevelDBTypeInfo> types; + private Class metadataType; public LevelDB(File path) throws Exception { this(path, new KVStoreSerializer()); @@ -109,8 +110,10 @@ public T getMetadata(Class klass) throws Exception { public void setMetadata(Object value) throws Exception { if (value != null) { put(METADATA_KEY, value); + metadataType = value.getClass(); } else { db().delete(METADATA_KEY); + metadataType = null; } } @@ -197,6 +200,16 @@ public Iterator iterator() { }; } + @Override + public Class metadataType() { + return metadataType; + } + + @Override + public Set> types() { + return types.keySet(); + } + @Override public boolean removeAllByIndexValues( Class klass, diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java index dd53fdf0b1b4c..8c57bd8ee0f7a 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java @@ -40,5 +40,4 @@ public boolean equals(Object o) { public int hashCode() { return Arrays.hashCode(key) ^ Arrays.hashCode(id); } - } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java index 27dde6a9fbea1..8ca97d10b1619 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java @@ -17,7 +17,7 @@ package org.apache.spark.util.kvstore; -public class InMemoryIteratorSuite extends DBIteratorSuite { +public class InMemoryIteratorSuite extends KVStoreIteratorSuite { @Override protected KVStore createStore() { diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java index 9e34225e14e18..42a5a638fbdad 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java @@ -17,9 +17,11 @@ package org.apache.spark.util.kvstore; +import java.util.Arrays; import java.util.NoSuchElementException; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.junit.Test; import static org.junit.Assert.*; @@ -86,6 +88,52 @@ public void testMultipleObjectWriteReadDelete() throws Exception { } } + @Test + public void testMultipleTypesWriteReadDelete() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t1 = new CustomType1(); + t1.key = "key1"; + t1.id = "id"; + t1.name = "name1"; + + IntKeyType t2 = new IntKeyType(); + t2.key = 2; + t2.id = "2"; + t2.values = Arrays.asList("value1", "value2"); + + ArrayKeyIndexType t3 = new ArrayKeyIndexType(); + t3.key = new int[] { 42, 84 }; + t3.id = new String[] { "id1", "id2" }; + + store.write(t1); + store.write(t2); + store.write(t3); + + assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), store.types()); + + assertEquals(t1, store.read(t1.getClass(), t1.key)); + assertEquals(t2, store.read(t2.getClass(), t2.key)); + assertEquals(t3, store.read(t3.getClass(), t3.key)); + + // There should be one "id" index with a single entry for each type. + assertEquals(1, store.count(t1.getClass(), "id", t1.id)); + assertEquals(1, store.count(t2.getClass(), "id", t2.id)); + assertEquals(1, store.count(t3.getClass(), "id", t3.id)); + + // Delete the first entry; this should not affect the entries for the second type. + store.delete(t1.getClass(), t1.key); + assertEquals(1, store.count(t2.getClass(), "id", t2.id)); + assertEquals(1, store.count(t3.getClass(), "id", t3.id)); + + // Delete the remaining entries, make sure all data is gone. + store.delete(t2.getClass(), t2.key); + assertEquals(0, store.count(t2.getClass())); + + store.delete(t3.getClass(), t3.key); + assertEquals(0, store.count(t3.getClass())); + } + @Test public void testMetadata() throws Exception { KVStore store = new InMemoryStore(); @@ -96,9 +144,11 @@ public void testMetadata() throws Exception { t.name = "name"; store.setMetadata(t); + assertEquals(CustomType1.class, store.metadataType()); assertEquals(t, store.getMetadata(CustomType1.class)); store.setMetadata(null); + assertNull(store.metadataType()); assertNull(store.getMetadata(CustomType1.class)); } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java new file mode 100644 index 0000000000000..81f251e8f6c41 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.util.List; + +public class IntKeyType { + @KVIndex + public int key; + + @KVIndex("id") + public String id; + + public List values; + + @Override + public boolean equals(Object o) { + if (o instanceof IntKeyType) { + IntKeyType other = (IntKeyType) o; + return key == other.key && id.equals(other.id) && values.equals(other.values); + } + return false; + } + + @Override + public int hashCode() { + return id.hashCode(); + } +} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreIteratorSuite.java similarity index 98% rename from common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java rename to common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreIteratorSuite.java index 1e062437d1803..74907af3ec7cb 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreIteratorSuite.java @@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.*; -public abstract class DBIteratorSuite { +public abstract class KVStoreIteratorSuite { - private static final Logger LOG = LoggerFactory.getLogger(DBIteratorSuite.class); + private static final Logger LOG = LoggerFactory.getLogger(KVStoreIteratorSuite.class); private static final int MIN_ENTRIES = 42; private static final int MAX_ENTRIES = 1024; @@ -380,7 +380,7 @@ public void childIndexDescendingWithLast() throws Exception { @Test public void testRefWithIntNaturalKey() throws Exception { - LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType(); + IntKeyType i = new IntKeyType(); i.key = 1; i.id = "1"; i.values = Arrays.asList("1"); diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java new file mode 100644 index 0000000000000..ecbad293a670d --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import com.google.common.collect.Sets; +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; + +public class KVStoreSnapshotterSuite { + + private final KVStoreSerializer serializer = new KVStoreSerializer(); + private final KVStoreSnapshotter snapshotter = new KVStoreSnapshotter(serializer); + + @Test + public void testMetadataEnabled() throws Exception { + KVStore src = new InMemoryStore(); + KVStore dest = new InMemoryStore(); + + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + src.setMetadata(t); + + prepareTestObjects(src); + runTestSnapshotAndRestore(src, dest); + } + + @Test + public void testMetadataNotAvailable() throws Exception { + KVStore src = new InMemoryStore(); + KVStore dest = new InMemoryStore(); + + prepareTestObjects(src); + runTestSnapshotAndRestore(src, dest); + } + + private void prepareTestObjects(KVStore testStore) throws Exception { + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + + CustomType1 t2 = new CustomType1(); + t2.key = "key2"; + t2.id = "id"; + t2.name = "name2"; + + ArrayKeyIndexType t3 = new ArrayKeyIndexType(); + t3.key = new int[] { 42, 84 }; + t3.id = new String[] { "id1", "id2" }; + + IntKeyType t4 = new IntKeyType(); + t4.key = 2; + t4.id = "2"; + t4.values = Arrays.asList("value1", "value2"); + + testStore.write(t); + testStore.write(t2); + testStore.write(t3); + testStore.write(t4); + } + + // source is expected to have some metadata and objects + // destination has to be empty + private void runTestSnapshotAndRestore(KVStore source, KVStore destination) throws Exception { + File snapshotFile = File.createTempFile("test-kvstore", ".snapshot"); + snapshotFile.delete(); + assertFalse(snapshotFile.exists()); + + try { + snapshotter.dump(source, snapshotFile); + assertTrue(snapshotFile.exists() && snapshotFile.isFile()); + assertTrue(snapshotFile.length() > 0); + + snapshotter.restore(snapshotFile, destination); + + Class metadataType = source.metadataType(); + assertEquals(destination.metadataType(), metadataType); + if (metadataType != null) { + assertEquals(destination.getMetadata(metadataType), source.getMetadata(metadataType)); + } + + Set> objectTypes = source.types(); + assertEquals(destination.types(), objectTypes); + for (Class tpe : objectTypes) { + Set destObjs = new HashSet<>(); + destination.view(tpe).closeableIterator().forEachRemaining(destObjs::add); + Set srcObjs = new HashSet<>(); + source.view(tpe).closeableIterator().forEachRemaining(srcObjs::add); + assertEquals(destObjs, srcObjs); + } + } finally { + FileUtils.deleteQuietly(snapshotFile); + } + } +} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java new file mode 100644 index 0000000000000..961053ba18a4d --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java @@ -0,0 +1,30 @@ +package org.apache.spark.util.kvstore; + +import org.junit.After; +import org.junit.Before; + +public abstract class KVStoreSuite { + private static KVStore db; + + /** + * Implementations should override this method; it is called only once, before all tests are + * run. Any state can be safely stored in static variables and cleaned up in a @AfterClass + * handler. + */ + protected abstract KVStore createStore() throws Exception; + + @Before + public void setup() throws Exception { + db = createStore(); + } + + @After + public void tearDown() throws Exception { + if (db != null) { + db.close(); + db = null; + } + } + + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java index f8195da58cf9f..5e9d171785c98 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java @@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils; import org.junit.AfterClass; -public class LevelDBIteratorSuite extends DBIteratorSuite { +public class LevelDBIteratorSuite extends KVStoreIteratorSuite { private static File dbpath; private static LevelDB db; diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index 0b755ba0e8000..854afe5de8eb7 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -25,6 +25,7 @@ import java.util.stream.StreamSupport; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.iq80.leveldb.DBIterator; import org.junit.After; @@ -148,6 +149,8 @@ public void testMultipleTypesWriteReadDelete() throws Exception { db.write(t2); db.write(t3); + assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), db.types()); + assertEquals(t1, db.read(t1.getClass(), t1.key)); assertEquals(t2, db.read(t2.getClass(), t2.key)); assertEquals(t3, db.read(t3.getClass(), t3.key)); @@ -178,9 +181,11 @@ public void testMetadata() throws Exception { CustomType1 t = createCustomType1(1); db.setMetadata(t); + assertEquals(CustomType1.class, db.metadataType()); assertEquals(t, db.getMetadata(CustomType1.class)); db.setMetadata(null); + assertNull(db.metadataType()); assertNull(db.getMetadata(CustomType1.class)); } @@ -302,31 +307,4 @@ private int countKeys(Class type) throws Exception { return count; } - - public static class IntKeyType { - - @KVIndex - public int key; - - @KVIndex("id") - public String id; - - public List values; - - @Override - public boolean equals(Object o) { - if (o instanceof IntKeyType) { - IntKeyType other = (IntKeyType) o; - return key == other.key && id.equals(other.id) && values.equals(other.values); - } - return false; - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - } - } From 24fe8548ba8177b4bc810df1fd233f5263fbd87c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Sep 2019 12:42:27 +0900 Subject: [PATCH 2/7] Remove unused file --- .../spark/util/kvstore/KVStoreSuite.java | 30 ------------------- 1 file changed, 30 deletions(-) delete mode 100644 common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java deleted file mode 100644 index 961053ba18a4d..0000000000000 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSuite.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.apache.spark.util.kvstore; - -import org.junit.After; -import org.junit.Before; - -public abstract class KVStoreSuite { - private static KVStore db; - - /** - * Implementations should override this method; it is called only once, before all tests are - * run. Any state can be safely stored in static variables and cleaned up in a @AfterClass - * handler. - */ - protected abstract KVStore createStore() throws Exception; - - @Before - public void setup() throws Exception { - db = createStore(); - } - - @After - public void tearDown() throws Exception { - if (db != null) { - db.close(); - db = null; - } - } - - -} From 6ea1e36c6f9c02f6ff17b1f581a41f46e46fc107 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Sep 2019 12:50:26 +0900 Subject: [PATCH 3/7] Fix compilation - missed to add file --- .../scala/org/apache/spark/status/ElementTrackingStore.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 38cb030297c81..5658571e0c486 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -17,6 +17,7 @@ package org.apache.spark.status +import java.util import java.util.Collection import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean @@ -197,6 +198,9 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten threshold: Long, action: Long => Unit) + override def metadataType(): Class[_] = store.metadataType() + + override def types(): util.Set[Class[_]] = store.types() } private[spark] object ElementTrackingStore { From c8261d7f5366a377d7762fda16e2c369dc84f7d0 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Sep 2019 13:25:43 +0900 Subject: [PATCH 4/7] Fix issues with java lint --- .../org/apache/spark/util/kvstore/KVStoreSnapshotter.java | 4 +++- .../org/apache/spark/util/kvstore/InMemoryStoreSuite.java | 5 +++-- .../apache/spark/util/kvstore/KVStoreSnapshotterSuite.java | 2 -- .../java/org/apache/spark/util/kvstore/LevelDBSuite.java | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java index a6a5721364c80..9513fabdb4a3b 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java @@ -122,7 +122,9 @@ private void writeEndOfFile(DataOutputStream output) throws IOException { output.writeInt(MARKER_END_OF_FILE); } - private Class readClassName(DataInputStream input, int classNameLen) throws IOException, ClassNotFoundException { + private Class readClassName( + DataInputStream input, + int classNameLen) throws IOException, ClassNotFoundException { byte[] classNameBuffer = new byte[classNameLen]; ByteStreams.readFully(input, classNameBuffer, 0, classNameLen); String className = new String(classNameBuffer, StandardCharsets.UTF_8); diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java index 42a5a638fbdad..47ac200616558 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java @@ -91,7 +91,7 @@ public void testMultipleObjectWriteReadDelete() throws Exception { @Test public void testMultipleTypesWriteReadDelete() throws Exception { KVStore store = new InMemoryStore(); - + CustomType1 t1 = new CustomType1(); t1.key = "key1"; t1.id = "id"; @@ -110,7 +110,8 @@ public void testMultipleTypesWriteReadDelete() throws Exception { store.write(t2); store.write(t3); - assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), store.types()); + assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), + store.types()); assertEquals(t1, store.read(t1.getClass(), t1.key)); assertEquals(t2, store.read(t2.getClass(), t2.key)); diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java index ecbad293a670d..dd584c906c0fb 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java @@ -17,7 +17,6 @@ package org.apache.spark.util.kvstore; -import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.junit.Test; @@ -29,7 +28,6 @@ import static org.junit.Assert.*; public class KVStoreSnapshotterSuite { - private final KVStoreSerializer serializer = new KVStoreSerializer(); private final KVStoreSnapshotter snapshotter = new KVStoreSnapshotter(serializer); diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index 854afe5de8eb7..33c3e5529d05e 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -149,7 +149,8 @@ public void testMultipleTypesWriteReadDelete() throws Exception { db.write(t2); db.write(t3); - assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), db.types()); + assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), + db.types()); assertEquals(t1, db.read(t1.getClass(), t1.key)); assertEquals(t2, db.read(t2.getClass(), t2.key)); From d94dc9c2a343117ba8e0f40f2c3416bd0a2b1a47 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Sep 2019 14:24:07 +0900 Subject: [PATCH 5/7] Unroll imports --- .../org/apache/spark/util/kvstore/InMemoryStore.java | 9 ++++++++- .../apache/spark/util/kvstore/KVStoreSnapshotter.java | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index 231dafd70422c..5550573ea44b2 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -17,7 +17,14 @@ package org.apache.spark.util.kvstore; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.HashSet; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java index 9513fabdb4a3b..caa6cc7469a88 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java @@ -19,7 +19,12 @@ import com.google.common.io.ByteStreams; -import java.io.*; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Set; From 922707eba30992725aa33ae67ac7bf399b4876cd Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 21 Sep 2019 10:15:00 +0900 Subject: [PATCH 6/7] Receive InputStream/OutputStream instead of File to be compatible with any filesystems --- .../util/kvstore/KVStoreSnapshotter.java | 47 +++++++++---------- .../util/kvstore/KVStoreSnapshotterSuite.java | 17 +++++-- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java index caa6cc7469a88..2aaba70127357 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java @@ -38,73 +38,70 @@ public KVStoreSnapshotter(KVStoreSerializer serializer) { this.serializer = serializer; } - public void dump(KVStore store, File snapshotFile) throws Exception { - DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile)); - + /** Dump current KVStore to the output stream - caller should close the output stream. */ + public void dump(KVStore store, DataOutputStream snapshotStream) throws Exception { // store metadata if it exists Class metadataType = store.metadataType(); if (metadataType != null) { - writeClassName(metadataType, output); + writeClassName(metadataType, snapshotStream); Object metadata = store.getMetadata(metadataType); - writeObject(metadata, output); - writeEndOfType(output); + writeObject(metadata, snapshotStream); + writeEndOfType(snapshotStream); } else { - writeEndOfType(output); + writeEndOfType(snapshotStream); } Set> types = store.types(); for (Class clazz : types) { - writeClassName(clazz, output); + writeClassName(clazz, snapshotStream); KVStoreView view = store.view(clazz); for (Object obj : view) { - writeObject(obj, output); + writeObject(obj, snapshotStream); } - writeEndOfType(output); + writeEndOfType(snapshotStream); } - writeEndOfFile(output); - output.close(); + writeEndOfFile(snapshotStream); } - public void restore(File snapshotFile, KVStore store) throws Exception { - DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile)); - + /** Restore current KVStore from the input stream - caller should close the input stream. */ + public void restore(DataInputStream snapshotStream, KVStore store) throws Exception { // first one would be metadata - int metadataClazzLen = input.readInt(); + int metadataClazzLen = snapshotStream.readInt(); if (metadataClazzLen > 0) { - Class metadataClazz = readClassName(input, metadataClazzLen); + Class metadataClazz = readClassName(snapshotStream, metadataClazzLen); // metadata presented - int objLen = input.readInt(); - Object metadata = readObj(input, metadataClazz, objLen); + int objLen = snapshotStream.readInt(); + Object metadata = readObj(snapshotStream, metadataClazz, objLen); store.setMetadata(metadata); // additionally read -2 as end of type - consumeEndOfType(input); + consumeEndOfType(snapshotStream); } boolean eof = false; while (!eof) { - int typeClazzNameLen = input.readInt(); + int typeClazzNameLen = snapshotStream.readInt(); if (typeClazzNameLen == MARKER_END_OF_FILE) { eof = true; } else { - Class typeClazz = readClassName(input, typeClazzNameLen); + Class typeClazz = readClassName(snapshotStream, typeClazzNameLen); boolean eot = false; while (!eot) { - int objLen = input.readInt(); + int objLen = snapshotStream.readInt(); if (objLen == MARKER_END_OF_TYPE) { eot = true; } else { - Object obj = readObj(input, typeClazz, objLen); + Object obj = readObj(snapshotStream, typeClazz, objLen); store.write(obj); } } } } - input.close(); + snapshotStream.close(); } private void writeClassName(Class clazz, DataOutputStream output) throws IOException { diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java index dd584c906c0fb..182eebef57853 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java @@ -20,7 +20,11 @@ import org.apache.commons.io.FileUtils; import org.junit.Test; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -88,12 +92,17 @@ private void runTestSnapshotAndRestore(KVStore source, KVStore destination) thro snapshotFile.delete(); assertFalse(snapshotFile.exists()); + try { - snapshotter.dump(source, snapshotFile); - assertTrue(snapshotFile.exists() && snapshotFile.isFile()); - assertTrue(snapshotFile.length() > 0); + try (DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile))) { + snapshotter.dump(source, output); + assertTrue(snapshotFile.exists() && snapshotFile.isFile()); + assertTrue(snapshotFile.length() > 0); + } - snapshotter.restore(snapshotFile, destination); + try (DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile))) { + snapshotter.restore(input, destination); + } Class metadataType = source.metadataType(); assertEquals(destination.metadataType(), metadataType); From 28cd3d573622843fdfdb694224b959634accfcb1 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sun, 8 Dec 2019 20:13:04 +0900 Subject: [PATCH 7/7] Lint fix --- .../java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java index 2aaba70127357..616b9aa168497 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java @@ -21,9 +21,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Set;