diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index b33c53871c32f..5550573ea44b2 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; @@ -106,6 +107,19 @@ public KVStoreView view(Class type){ return list != null ? list.view() : emptyView(); } + @Override + public Class metadataType() { + if (metadata == null) { + return null; + } + return metadata.getClass(); + } + + @Override + public Set> types() { + return inMemoryLists.data.keySet(); + } + @Override public void close() { metadata = null; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java index ac159eb43182f..c7f2872fd31a1 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.util.Collection; +import java.util.Set; import org.apache.spark.annotation.Private; @@ -117,6 +118,16 @@ public interface KVStore extends Closeable { */ KVStoreView view(Class type) throws Exception; + /** + * Returns a type of app-specific metadata from the store, or null if it's not currently set. + */ + Class metadataType(); + + /** + * Returns all available types of all objects. + */ + Set> types(); + /** * Returns the number of items of the given type currently in the store. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java new file mode 100644 index 0000000000000..616b9aa168497 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import com.google.common.io.ByteStreams; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Set; + +public class KVStoreSnapshotter { + private static final int MARKER_END_OF_TYPE = -2; + private static final int MARKER_END_OF_FILE = -1; + + private final KVStoreSerializer serializer; + + public KVStoreSnapshotter(KVStoreSerializer serializer) { + this.serializer = serializer; + } + + /** Dump current KVStore to the output stream - caller should close the output stream. */ + public void dump(KVStore store, DataOutputStream snapshotStream) throws Exception { + // store metadata if it exists + Class metadataType = store.metadataType(); + if (metadataType != null) { + writeClassName(metadataType, snapshotStream); + Object metadata = store.getMetadata(metadataType); + writeObject(metadata, snapshotStream); + writeEndOfType(snapshotStream); + } else { + writeEndOfType(snapshotStream); + } + + Set> types = store.types(); + for (Class clazz : types) { + writeClassName(clazz, snapshotStream); + + KVStoreView view = store.view(clazz); + for (Object obj : view) { + writeObject(obj, snapshotStream); + } + + writeEndOfType(snapshotStream); + } + + writeEndOfFile(snapshotStream); + } + + /** Restore current KVStore from the input stream - caller should close the input stream. */ + public void restore(DataInputStream snapshotStream, KVStore store) throws Exception { + // first one would be metadata + int metadataClazzLen = snapshotStream.readInt(); + if (metadataClazzLen > 0) { + Class metadataClazz = readClassName(snapshotStream, metadataClazzLen); + // metadata presented + int objLen = snapshotStream.readInt(); + Object metadata = readObj(snapshotStream, metadataClazz, objLen); + store.setMetadata(metadata); + + // additionally read -2 as end of type + consumeEndOfType(snapshotStream); + } + + boolean eof = false; + while (!eof) { + int typeClazzNameLen = snapshotStream.readInt(); + if (typeClazzNameLen == MARKER_END_OF_FILE) { + eof = true; + } else { + Class typeClazz = readClassName(snapshotStream, typeClazzNameLen); + boolean eot = false; + while (!eot) { + int objLen = snapshotStream.readInt(); + if (objLen == MARKER_END_OF_TYPE) { + eot = true; + } else { + Object obj = readObj(snapshotStream, typeClazz, objLen); + store.write(obj); + } + } + } + } + + snapshotStream.close(); + } + + private void writeClassName(Class clazz, DataOutputStream output) throws IOException { + byte[] clazzName = clazz.getCanonicalName().getBytes(StandardCharsets.UTF_8); + output.writeInt(clazzName.length); + output.write(clazzName); + } + + private void writeObject(Object obj, DataOutputStream output) throws Exception { + byte[] ser = serializer.serialize(obj); + output.writeInt(ser.length); + output.write(ser); + } + + private void writeEndOfType(DataOutputStream output) throws IOException { + output.writeInt(MARKER_END_OF_TYPE); + } + + private void writeEndOfFile(DataOutputStream output) throws IOException { + output.writeInt(MARKER_END_OF_FILE); + } + + private Class readClassName( + DataInputStream input, + int classNameLen) throws IOException, ClassNotFoundException { + byte[] classNameBuffer = new byte[classNameLen]; + ByteStreams.readFully(input, classNameBuffer, 0, classNameLen); + String className = new String(classNameBuffer, StandardCharsets.UTF_8); + return Class.forName(className); + } + + private Object readObj(DataInputStream input, Class clazz, int objLen) throws Exception { + byte[] objBuffer = new byte[objLen]; + ByteStreams.readFully(input, objBuffer, 0, objLen); + return serializer.deserialize(objBuffer, clazz); + } + + private void consumeEndOfType(DataInputStream input) throws IOException { + int eotCode = input.readInt(); + if (eotCode != MARKER_END_OF_TYPE) { + throw new IllegalStateException("The notion of 'end of type' is expected here, but got " + + eotCode + " instead"); + } + } +} diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 2ca4b0b2cb9f9..935a7a1f24629 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -63,6 +63,7 @@ public class LevelDB implements KVStore { */ private final ConcurrentMap typeAliases; private final ConcurrentMap, LevelDBTypeInfo> types; + private Class metadataType; public LevelDB(File path) throws Exception { this(path, new KVStoreSerializer()); @@ -109,8 +110,10 @@ public T getMetadata(Class klass) throws Exception { public void setMetadata(Object value) throws Exception { if (value != null) { put(METADATA_KEY, value); + metadataType = value.getClass(); } else { db().delete(METADATA_KEY); + metadataType = null; } } @@ -197,6 +200,16 @@ public Iterator iterator() { }; } + @Override + public Class metadataType() { + return metadataType; + } + + @Override + public Set> types() { + return types.keySet(); + } + @Override public boolean removeAllByIndexValues( Class klass, diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java index dd53fdf0b1b4c..8c57bd8ee0f7a 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java @@ -40,5 +40,4 @@ public boolean equals(Object o) { public int hashCode() { return Arrays.hashCode(key) ^ Arrays.hashCode(id); } - } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java index 27dde6a9fbea1..8ca97d10b1619 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java @@ -17,7 +17,7 @@ package org.apache.spark.util.kvstore; -public class InMemoryIteratorSuite extends DBIteratorSuite { +public class InMemoryIteratorSuite extends KVStoreIteratorSuite { @Override protected KVStore createStore() { diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java index 9e34225e14e18..47ac200616558 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java @@ -17,9 +17,11 @@ package org.apache.spark.util.kvstore; +import java.util.Arrays; import java.util.NoSuchElementException; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.junit.Test; import static org.junit.Assert.*; @@ -86,6 +88,53 @@ public void testMultipleObjectWriteReadDelete() throws Exception { } } + @Test + public void testMultipleTypesWriteReadDelete() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t1 = new CustomType1(); + t1.key = "key1"; + t1.id = "id"; + t1.name = "name1"; + + IntKeyType t2 = new IntKeyType(); + t2.key = 2; + t2.id = "2"; + t2.values = Arrays.asList("value1", "value2"); + + ArrayKeyIndexType t3 = new ArrayKeyIndexType(); + t3.key = new int[] { 42, 84 }; + t3.id = new String[] { "id1", "id2" }; + + store.write(t1); + store.write(t2); + store.write(t3); + + assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), + store.types()); + + assertEquals(t1, store.read(t1.getClass(), t1.key)); + assertEquals(t2, store.read(t2.getClass(), t2.key)); + assertEquals(t3, store.read(t3.getClass(), t3.key)); + + // There should be one "id" index with a single entry for each type. + assertEquals(1, store.count(t1.getClass(), "id", t1.id)); + assertEquals(1, store.count(t2.getClass(), "id", t2.id)); + assertEquals(1, store.count(t3.getClass(), "id", t3.id)); + + // Delete the first entry; this should not affect the entries for the second type. + store.delete(t1.getClass(), t1.key); + assertEquals(1, store.count(t2.getClass(), "id", t2.id)); + assertEquals(1, store.count(t3.getClass(), "id", t3.id)); + + // Delete the remaining entries, make sure all data is gone. + store.delete(t2.getClass(), t2.key); + assertEquals(0, store.count(t2.getClass())); + + store.delete(t3.getClass(), t3.key); + assertEquals(0, store.count(t3.getClass())); + } + @Test public void testMetadata() throws Exception { KVStore store = new InMemoryStore(); @@ -96,9 +145,11 @@ public void testMetadata() throws Exception { t.name = "name"; store.setMetadata(t); + assertEquals(CustomType1.class, store.metadataType()); assertEquals(t, store.getMetadata(CustomType1.class)); store.setMetadata(null); + assertNull(store.metadataType()); assertNull(store.getMetadata(CustomType1.class)); } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java new file mode 100644 index 0000000000000..81f251e8f6c41 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/IntKeyType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.util.List; + +public class IntKeyType { + @KVIndex + public int key; + + @KVIndex("id") + public String id; + + public List values; + + @Override + public boolean equals(Object o) { + if (o instanceof IntKeyType) { + IntKeyType other = (IntKeyType) o; + return key == other.key && id.equals(other.id) && values.equals(other.values); + } + return false; + } + + @Override + public int hashCode() { + return id.hashCode(); + } +} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreIteratorSuite.java similarity index 98% rename from common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java rename to common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreIteratorSuite.java index 1e062437d1803..74907af3ec7cb 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreIteratorSuite.java @@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.*; -public abstract class DBIteratorSuite { +public abstract class KVStoreIteratorSuite { - private static final Logger LOG = LoggerFactory.getLogger(DBIteratorSuite.class); + private static final Logger LOG = LoggerFactory.getLogger(KVStoreIteratorSuite.class); private static final int MIN_ENTRIES = 42; private static final int MAX_ENTRIES = 1024; @@ -380,7 +380,7 @@ public void childIndexDescendingWithLast() throws Exception { @Test public void testRefWithIntNaturalKey() throws Exception { - LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType(); + IntKeyType i = new IntKeyType(); i.key = 1; i.id = "1"; i.values = Arrays.asList("1"); diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java new file mode 100644 index 0000000000000..182eebef57853 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; + +public class KVStoreSnapshotterSuite { + private final KVStoreSerializer serializer = new KVStoreSerializer(); + private final KVStoreSnapshotter snapshotter = new KVStoreSnapshotter(serializer); + + @Test + public void testMetadataEnabled() throws Exception { + KVStore src = new InMemoryStore(); + KVStore dest = new InMemoryStore(); + + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + src.setMetadata(t); + + prepareTestObjects(src); + runTestSnapshotAndRestore(src, dest); + } + + @Test + public void testMetadataNotAvailable() throws Exception { + KVStore src = new InMemoryStore(); + KVStore dest = new InMemoryStore(); + + prepareTestObjects(src); + runTestSnapshotAndRestore(src, dest); + } + + private void prepareTestObjects(KVStore testStore) throws Exception { + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + + CustomType1 t2 = new CustomType1(); + t2.key = "key2"; + t2.id = "id"; + t2.name = "name2"; + + ArrayKeyIndexType t3 = new ArrayKeyIndexType(); + t3.key = new int[] { 42, 84 }; + t3.id = new String[] { "id1", "id2" }; + + IntKeyType t4 = new IntKeyType(); + t4.key = 2; + t4.id = "2"; + t4.values = Arrays.asList("value1", "value2"); + + testStore.write(t); + testStore.write(t2); + testStore.write(t3); + testStore.write(t4); + } + + // source is expected to have some metadata and objects + // destination has to be empty + private void runTestSnapshotAndRestore(KVStore source, KVStore destination) throws Exception { + File snapshotFile = File.createTempFile("test-kvstore", ".snapshot"); + snapshotFile.delete(); + assertFalse(snapshotFile.exists()); + + + try { + try (DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile))) { + snapshotter.dump(source, output); + assertTrue(snapshotFile.exists() && snapshotFile.isFile()); + assertTrue(snapshotFile.length() > 0); + } + + try (DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile))) { + snapshotter.restore(input, destination); + } + + Class metadataType = source.metadataType(); + assertEquals(destination.metadataType(), metadataType); + if (metadataType != null) { + assertEquals(destination.getMetadata(metadataType), source.getMetadata(metadataType)); + } + + Set> objectTypes = source.types(); + assertEquals(destination.types(), objectTypes); + for (Class tpe : objectTypes) { + Set destObjs = new HashSet<>(); + destination.view(tpe).closeableIterator().forEachRemaining(destObjs::add); + Set srcObjs = new HashSet<>(); + source.view(tpe).closeableIterator().forEachRemaining(srcObjs::add); + assertEquals(destObjs, srcObjs); + } + } finally { + FileUtils.deleteQuietly(snapshotFile); + } + } +} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java index f8195da58cf9f..5e9d171785c98 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java @@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils; import org.junit.AfterClass; -public class LevelDBIteratorSuite extends DBIteratorSuite { +public class LevelDBIteratorSuite extends KVStoreIteratorSuite { private static File dbpath; private static LevelDB db; diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index 0b755ba0e8000..33c3e5529d05e 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -25,6 +25,7 @@ import java.util.stream.StreamSupport; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.iq80.leveldb.DBIterator; import org.junit.After; @@ -148,6 +149,9 @@ public void testMultipleTypesWriteReadDelete() throws Exception { db.write(t2); db.write(t3); + assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), + db.types()); + assertEquals(t1, db.read(t1.getClass(), t1.key)); assertEquals(t2, db.read(t2.getClass(), t2.key)); assertEquals(t3, db.read(t3.getClass(), t3.key)); @@ -178,9 +182,11 @@ public void testMetadata() throws Exception { CustomType1 t = createCustomType1(1); db.setMetadata(t); + assertEquals(CustomType1.class, db.metadataType()); assertEquals(t, db.getMetadata(CustomType1.class)); db.setMetadata(null); + assertNull(db.metadataType()); assertNull(db.getMetadata(CustomType1.class)); } @@ -302,31 +308,4 @@ private int countKeys(Class type) throws Exception { return count; } - - public static class IntKeyType { - - @KVIndex - public int key; - - @KVIndex("id") - public String id; - - public List values; - - @Override - public boolean equals(Object o) { - if (o instanceof IntKeyType) { - IntKeyType other = (IntKeyType) o; - return key == other.key && id.equals(other.id) && values.equals(other.values); - } - return false; - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - } - } diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 38cb030297c81..5658571e0c486 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -17,6 +17,7 @@ package org.apache.spark.status +import java.util import java.util.Collection import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean @@ -197,6 +198,9 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten threshold: Long, action: Long => Unit) + override def metadataType(): Class[_] = store.metadataType() + + override def types(): util.Set[Class[_]] = store.types() } private[spark] object ElementTrackingStore {