Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -106,6 +107,19 @@ public <T> KVStoreView<T> view(Class<T> type){
return list != null ? list.view() : emptyView();
}

@Override
public Class<?> metadataType() {
if (metadata == null) {
return null;
}
return metadata.getClass();
}

@Override
public Set<Class<?>> types() {
return inMemoryLists.data.keySet();
}

@Override
public void close() {
metadata = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.util.Collection;
import java.util.Set;

import org.apache.spark.annotation.Private;

Expand Down Expand Up @@ -117,6 +118,16 @@ public interface KVStore extends Closeable {
*/
<T> KVStoreView<T> view(Class<T> type) throws Exception;

/**
* Returns a type of app-specific metadata from the store, or null if it's not currently set.
*/
Class<?> metadataType();

/**
* Returns all available types of all objects.
*/
Set<Class<?>> types();

/**
* Returns the number of items of the given type currently in the store.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.kvstore;

import com.google.common.io.ByteStreams;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Set;

public class KVStoreSnapshotter {
private static final int MARKER_END_OF_TYPE = -2;
private static final int MARKER_END_OF_FILE = -1;

private final KVStoreSerializer serializer;

public KVStoreSnapshotter(KVStoreSerializer serializer) {
this.serializer = serializer;
}

/** Dump current KVStore to the output stream - caller should close the output stream. */
public void dump(KVStore store, DataOutputStream snapshotStream) throws Exception {
// store metadata if it exists
Class<?> metadataType = store.metadataType();
if (metadataType != null) {
writeClassName(metadataType, snapshotStream);
Object metadata = store.getMetadata(metadataType);
writeObject(metadata, snapshotStream);
writeEndOfType(snapshotStream);
} else {
writeEndOfType(snapshotStream);
}

Set<Class<?>> types = store.types();
for (Class<?> clazz : types) {
writeClassName(clazz, snapshotStream);

KVStoreView<?> view = store.view(clazz);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering will there be a type with empty objects in the KVStore. Normally, it seems impossible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure any implementations allow type with empty objects.

I see what you say - if there's some implementation allow the case like types() returning Class but view(A.class) contains nothing, we don't provide the way to only add type to KVStore. We may want to be clear in the interface javadoc that "type with empty objects are ignored while recovering, so implementations should not rely on this", as thinking theoretically, but I'm afraid I might be over-thinking.

for (Object obj : view) {
writeObject(obj, snapshotStream);
}

writeEndOfType(snapshotStream);
}

writeEndOfFile(snapshotStream);
}

/** Restore current KVStore from the input stream - caller should close the input stream. */
public void restore(DataInputStream snapshotStream, KVStore store) throws Exception {
// first one would be metadata
int metadataClazzLen = snapshotStream.readInt();
if (metadataClazzLen > 0) {
Class<?> metadataClazz = readClassName(snapshotStream, metadataClazzLen);
// metadata presented
int objLen = snapshotStream.readInt();
Object metadata = readObj(snapshotStream, metadataClazz, objLen);
store.setMetadata(metadata);

// additionally read -2 as end of type
consumeEndOfType(snapshotStream);
}

boolean eof = false;
while (!eof) {
int typeClazzNameLen = snapshotStream.readInt();
if (typeClazzNameLen == MARKER_END_OF_FILE) {
eof = true;
} else {
Class<?> typeClazz = readClassName(snapshotStream, typeClazzNameLen);
boolean eot = false;
while (!eot) {
int objLen = snapshotStream.readInt();
if (objLen == MARKER_END_OF_TYPE) {
eot = true;
} else {
Object obj = readObj(snapshotStream, typeClazz, objLen);
store.write(obj);
}
}
}
}

snapshotStream.close();
}

private void writeClassName(Class<?> clazz, DataOutputStream output) throws IOException {
byte[] clazzName = clazz.getCanonicalName().getBytes(StandardCharsets.UTF_8);
output.writeInt(clazzName.length);
output.write(clazzName);
}

private void writeObject(Object obj, DataOutputStream output) throws Exception {
byte[] ser = serializer.serialize(obj);
output.writeInt(ser.length);
output.write(ser);
}

private void writeEndOfType(DataOutputStream output) throws IOException {
output.writeInt(MARKER_END_OF_TYPE);
}

private void writeEndOfFile(DataOutputStream output) throws IOException {
output.writeInt(MARKER_END_OF_FILE);
}

private Class<?> readClassName(
DataInputStream input,
int classNameLen) throws IOException, ClassNotFoundException {
byte[] classNameBuffer = new byte[classNameLen];
ByteStreams.readFully(input, classNameBuffer, 0, classNameLen);
String className = new String(classNameBuffer, StandardCharsets.UTF_8);
return Class.forName(className);
}

private Object readObj(DataInputStream input, Class<?> clazz, int objLen) throws Exception {
byte[] objBuffer = new byte[objLen];
ByteStreams.readFully(input, objBuffer, 0, objLen);
return serializer.deserialize(objBuffer, clazz);
}

private void consumeEndOfType(DataInputStream input) throws IOException {
int eotCode = input.readInt();
if (eotCode != MARKER_END_OF_TYPE) {
throw new IllegalStateException("The notion of 'end of type' is expected here, but got " +
eotCode + " instead");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class LevelDB implements KVStore {
*/
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
private Class<?> metadataType;

public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
Expand Down Expand Up @@ -109,8 +110,10 @@ public <T> T getMetadata(Class<T> klass) throws Exception {
public void setMetadata(Object value) throws Exception {
if (value != null) {
put(METADATA_KEY, value);
metadataType = value.getClass();
} else {
db().delete(METADATA_KEY);
metadataType = null;
}
}

Expand Down Expand Up @@ -197,6 +200,16 @@ public Iterator<T> iterator() {
};
}

@Override
public Class<?> metadataType() {
return metadataType;
}

@Override
public Set<Class<?>> types() {
return types.keySet();
}

@Override
public <T> boolean removeAllByIndexValues(
Class<T> klass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ public boolean equals(Object o) {
public int hashCode() {
return Arrays.hashCode(key) ^ Arrays.hashCode(id);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util.kvstore;

public class InMemoryIteratorSuite extends DBIteratorSuite {
public class InMemoryIteratorSuite extends KVStoreIteratorSuite {

@Override
protected KVStore createStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.util.kvstore;

import java.util.Arrays;
import java.util.NoSuchElementException;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.junit.Test;
import static org.junit.Assert.*;

Expand Down Expand Up @@ -86,6 +88,53 @@ public void testMultipleObjectWriteReadDelete() throws Exception {
}
}

@Test
public void testMultipleTypesWriteReadDelete() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was only available for LevelDBSuite so I copied here to test new API addition.

KVStore store = new InMemoryStore();

CustomType1 t1 = new CustomType1();
t1.key = "key1";
t1.id = "id";
t1.name = "name1";

IntKeyType t2 = new IntKeyType();
t2.key = 2;
t2.id = "2";
t2.values = Arrays.asList("value1", "value2");

ArrayKeyIndexType t3 = new ArrayKeyIndexType();
t3.key = new int[] { 42, 84 };
t3.id = new String[] { "id1", "id2" };

store.write(t1);
store.write(t2);
store.write(t3);

assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class),
store.types());

assertEquals(t1, store.read(t1.getClass(), t1.key));
assertEquals(t2, store.read(t2.getClass(), t2.key));
assertEquals(t3, store.read(t3.getClass(), t3.key));

// There should be one "id" index with a single entry for each type.
assertEquals(1, store.count(t1.getClass(), "id", t1.id));
assertEquals(1, store.count(t2.getClass(), "id", t2.id));
assertEquals(1, store.count(t3.getClass(), "id", t3.id));

// Delete the first entry; this should not affect the entries for the second type.
store.delete(t1.getClass(), t1.key);
assertEquals(1, store.count(t2.getClass(), "id", t2.id));
assertEquals(1, store.count(t3.getClass(), "id", t3.id));

// Delete the remaining entries, make sure all data is gone.
store.delete(t2.getClass(), t2.key);
assertEquals(0, store.count(t2.getClass()));

store.delete(t3.getClass(), t3.key);
assertEquals(0, store.count(t3.getClass()));
}

@Test
public void testMetadata() throws Exception {
KVStore store = new InMemoryStore();
Expand All @@ -96,9 +145,11 @@ public void testMetadata() throws Exception {
t.name = "name";

store.setMetadata(t);
assertEquals(CustomType1.class, store.metadataType());
assertEquals(t, store.getMetadata(CustomType1.class));

store.setMetadata(null);
assertNull(store.metadataType());
assertNull(store.getMetadata(CustomType1.class));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.kvstore;

import java.util.List;

public class IntKeyType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved out of LevelDBSuite to co-use between test suites.

@KVIndex
public int key;

@KVIndex("id")
public String id;

public List<String> values;

@Override
public boolean equals(Object o) {
if (o instanceof IntKeyType) {
IntKeyType other = (IntKeyType) o;
return key == other.key && id.equals(other.id) && values.equals(other.values);
}
return false;
}

@Override
public int hashCode() {
return id.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;

public abstract class DBIteratorSuite {
public abstract class KVStoreIteratorSuite {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just renamed this as it made me confused - I imagined DB as LevelDB but there's separate suite for LevelDB. KVStore sounds better to me.


private static final Logger LOG = LoggerFactory.getLogger(DBIteratorSuite.class);
private static final Logger LOG = LoggerFactory.getLogger(KVStoreIteratorSuite.class);

private static final int MIN_ENTRIES = 42;
private static final int MAX_ENTRIES = 1024;
Expand Down Expand Up @@ -380,7 +380,7 @@ public void childIndexDescendingWithLast() throws Exception {

@Test
public void testRefWithIntNaturalKey() throws Exception {
LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
IntKeyType i = new IntKeyType();
i.key = 1;
i.id = "1";
i.values = Arrays.asList("1");
Expand Down
Loading