diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java new file mode 100644 index 0000000000000..e5594686c83dc --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; + +import com.google.common.base.Objects; + +/** + * A class that wraps a generic array so that it can be used as a key in a map, sorted or not. + * + * The implementation of {@link #compareTo(GenericArrayKey)} makes two assumptions: + * - All elements are instances of Comparable + * - When comparing two arrays, they both contain elements of the same type in corresponding + * indices. + * + * Otherwise, ClassCastExceptions may occur. The equality method can compare any two arrays. + * + * This class is not efficient and is mostly meant to compare really small arrays, like those + * generally used as indices and keys in a KVStore. + */ +public class GenericArrayKey implements Comparable { + + private final Object key; + + public GenericArrayKey(Object key) { + this.key = key; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof GenericArrayKey)) { + return false; + } + + Object comp = ((GenericArrayKey) other).key; + int l1 = Array.getLength(key); + int l2 = Array.getLength(comp); + + if (l1 != l2) { + return false; + } + + for (int i = 0; i < l1; i++) { + if (!Objects.equal(Array.get(key, i), Array.get(comp, i))) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + int code = 0; + int length = Array.getLength(key); + for (int i = 0; i < length; i++) { + code += Array.get(key, i).hashCode(); + } + return 31 * code; + } + + @Override + @SuppressWarnings("unchecked") + public int compareTo(GenericArrayKey other) { + int len = Math.min(Array.getLength(key), Array.getLength(other.key)); + for (int i = 0; i < len; i++) { + int diff = ((Comparable) Array.get(key, i)).compareTo( + (Comparable) Array.get(other.key, i)); + if (diff != 0) { + return diff; + } + } + + return Array.getLength(key) - Array.getLength(other.key); + } + +} diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java new file mode 100644 index 0000000000000..78b71a0539db8 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +/** + * Implementation of KVStore that keeps data deserialized in memory. This store does not index + * data; instead, whenever iterating over an indexed field, the stored data is copied and sorted + * according to the index. This saves memory but makes iteration more expensive. + */ +public class InMemoryStore implements KVStore { + + private Object metadata; + private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + + @Override + public T getMetadata(Class klass) { + return klass.cast(metadata); + } + + @Override + public void setMetadata(Object value) { + this.metadata = value; + } + + @Override + public long count(Class type) { + InstanceList list = data.get(type); + return list != null ? list.size() : 0; + } + + @Override + public long count(Class type, String index, Object indexedValue) throws Exception { + InstanceList list = data.get(type); + int count = 0; + for (Object o : view(type)) { + if (Objects.equal(indexedValue, list.getIndexValue(o, index))) { + count++; + } + } + return count; + } + + @Override + public T read(Class klass, Object naturalKey) { + InstanceList list = data.get(klass); + Object value = list != null ? list.get(naturalKey) : null; + if (value == null) { + throw new NoSuchElementException(); + } + return klass.cast(value); + } + + @Override + public void write(Object value) throws Exception { + InstanceList list = data.computeIfAbsent(value.getClass(), key -> { + try { + return new InstanceList(key); + } catch (Exception e) { + throw Throwables.propagate(e); + } + }); + list.put(value); + } + + @Override + public void delete(Class type, Object naturalKey) { + InstanceList list = data.get(type); + if (list != null) { + list.delete(naturalKey); + } + } + + @Override + public KVStoreView view(Class type){ + InstanceList list = data.get(type); + return list != null ? list.view(type) + : new InMemoryView<>(type, Collections.emptyList(), null); + } + + @Override + public void close() { + metadata = null; + data.clear(); + } + + @SuppressWarnings("unchecked") + private static Comparable asKey(Object in) { + if (in.getClass().isArray()) { + in = new GenericArrayKey(in); + } + return (Comparable) in; + } + + private static class InstanceList { + + private final KVTypeInfo ti; + private final KVTypeInfo.Accessor naturalKey; + private final ConcurrentMap, Object> data; + + private int size; + + private InstanceList(Class type) throws Exception { + this.ti = new KVTypeInfo(type); + this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); + this.data = new ConcurrentHashMap<>(); + this.size = 0; + } + + Object getIndexValue(Object o, String indexName) throws Exception { + return ti.getAccessor(indexName).get(o); + } + + public Object get(Object key) { + return data.get(asKey(key)); + } + + public void put(Object value) throws Exception { + Preconditions.checkArgument(ti.type().equals(value.getClass()), + "Unexpected type: %s", value.getClass()); + if (data.put(asKey(naturalKey.get(value)), value) == null) { + size++; + } + } + + public void delete(Object key) { + if (data.remove(asKey(key)) != null) { + size--; + } + } + + public int size() { + return size; + } + + @SuppressWarnings("unchecked") + public InMemoryView view(Class type) { + Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); + Collection all = (Collection) data.values(); + return new InMemoryView(type, all, ti); + } + + } + + private static class InMemoryView extends KVStoreView { + + private final Collection elements; + private final KVTypeInfo ti; + private final KVTypeInfo.Accessor natural; + + InMemoryView(Class type, Collection elements, KVTypeInfo ti) { + super(type); + this.elements = elements; + this.ti = ti; + this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null; + } + + @Override + public Iterator iterator() { + if (elements.isEmpty()) { + return new InMemoryIterator<>(elements.iterator()); + } + + Preconditions.checkArgument(index != null || first == null, + "First requires sorting to be configured for the view."); + Preconditions.checkArgument(index != null || last == null, + "Last requires sorting to be configured for the view."); + + try { + KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null; + int modifier = ascending ? 1 : -1; + + final Collection iterable; + if (index != null) { + List copy = copyElements(); + Collections.sort(copy, (e1, e2) -> modifier * compare(e1, e2, getter)); + iterable = copy; + } else { + iterable = this.elements; + } + + Stream stream = iterable.stream(); + + if (first != null) { + stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0); + } + + if (last != null) { + stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0); + } + + if (skip > 0) { + stream = stream.skip(skip); + } + + if (max < iterable.size()) { + stream = stream.limit((int) max); + } + + return new InMemoryIterator<>(stream.iterator()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * Create a copy of the input elements, filtering the values for child indices if needed. + */ + private List copyElements() { + if (parent != null) { + KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); + Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); + + return elements.stream() + .filter(e -> compare(e, parentGetter, parent) == 0) + .collect(Collectors.toList()); + } else { + return new ArrayList<>(elements); + } + } + + private int compare(T e1, T e2, KVTypeInfo.Accessor getter) { + try { + int diff = compare(e1, getter, getter.get(e2)); + if (diff == 0 && getter != natural) { + diff = compare(e1, natural, natural.get(e2)); + } + return diff; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) { + try { + return asKey(getter.get(e1)).compareTo(asKey(v2)); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + } + + private static class InMemoryIterator implements KVStoreIterator { + + private final Iterator iter; + + InMemoryIterator(Iterator iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public T next() { + return iter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public List next(int max) { + List list = new ArrayList<>(max); + while (hasNext() && list.size() < max) { + list.add(next()); + } + return list; + } + + @Override + public boolean skip(long n) { + long skipped = 0; + while (skipped < n) { + if (hasNext()) { + next(); + skipped++; + } else { + return false; + } + } + + return true; + } + + @Override + public void close() { + // no op. + } + + } + +} diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java index cc5874e958873..ebf5e2cd4d7bd 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java @@ -35,13 +35,20 @@ * when used in a for loop that exhausts their contents, but when used manually, they need * to be closed explicitly unless all elements are read. *

+ * + *

+ * By default, views do not have a pre-defined ordering. To define an order, call either + * {@link #sorted()} or {@link #index(String)} to define the desired order of elements. + * Some features, such as defining the first and last element to be returned, require + * an order to be defined. + *

*/ public abstract class KVStoreView implements Iterable { final Class type; boolean ascending = true; - String index = KVIndex.NATURAL_INDEX_NAME; + String index = null; Object first = null; Object last = null; Object parent = null; @@ -60,6 +67,13 @@ public KVStoreView reverse() { return this; } + /** + * Sorts the view according to the natural index. + */ + public KVStoreView sorted() { + return index(KVIndex.NATURAL_INDEX_NAME); + } + /** * Iterates according to the given index. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java index 19a559f902815..889d57718969d 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java @@ -89,7 +89,7 @@ private void checkIndex(KVIndex idx, Map indices) { "Duplicate index %s for type %s.", idx.value(), type.getName()); } - public Class getType() { + public Class type() { return type; } diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java index 2143a0e877389..76945d044c680 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java @@ -52,7 +52,7 @@ class LevelDBIterator implements KVStoreIterator { this.it = db.db().iterator(); this.type = params.type; this.ti = db.getTypeInfo(type); - this.index = ti.index(params.index); + this.index = ti.index(params.index != null ? params.index : KVIndex.NATURAL_INDEX_NAME); this.max = params.max; Preconditions.checkArgument(!index.isChild() || params.parent != null, @@ -172,6 +172,18 @@ public synchronized void close() throws IOException { } } + /** + * Because it's tricky to expose closeable iterators through many internal APIs, especially + * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by + * the iterator will eventually be released. + */ + @Override + protected void finalize() throws Throwable { + if (db.db() != null) { + close(); + } + } + private T loadNext() { if (count >= max) { return null; diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java new file mode 100644 index 0000000000000..d5938acc3e80e --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.Arrays; + +public class ArrayKeyIndexType { + + @KVIndex + public int[] key; + + @KVIndex("id") + public String[] id; + + @Override + public boolean equals(Object o) { + if (o instanceof ArrayKeyIndexType) { + ArrayKeyIndexType other = (ArrayKeyIndexType) o; + return Arrays.equals(key, other.key) && Arrays.equals(id, other.id); + } + return false; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java index 32489a2174c21..6684bf367e2d1 100644 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java @@ -127,7 +127,9 @@ public void setup() throws Exception { // the same way the store is expected to. CustomType1 first = allEntries.get(0); clashingEntries = new ArrayList<>(); - for (int i = 0; i < RND.nextInt(MIN_ENTRIES) + 1; i++) { + + int clashCount = RND.nextInt(MIN_ENTRIES) + 1; + for (int i = 0; i < clashCount; i++) { CustomType1 t = new CustomType1(); t.key = "n-key" + (count + i); t.id = first.id; @@ -154,7 +156,7 @@ public void setup() throws Exception { @Test public void naturalIndex() throws Exception { - testIteration(NATURAL_ORDER, view(), null, null); + testIteration(NATURAL_ORDER, view().sorted(), null, null); } @Test @@ -180,7 +182,7 @@ public void childIndex() throws Exception { @Test public void naturalIndexDescending() throws Exception { - testIteration(NATURAL_ORDER, view().reverse(), null, null); + testIteration(NATURAL_ORDER, view().sorted().reverse(), null, null); } @Test @@ -207,7 +209,7 @@ public void childIndexDescending() throws Exception { @Test public void naturalIndexWithStart() throws Exception { CustomType1 first = pickLimit(); - testIteration(NATURAL_ORDER, view().first(first.key), first, null); + testIteration(NATURAL_ORDER, view().sorted().first(first.key), first, null); } @Test @@ -238,7 +240,7 @@ public void childIndexWithStart() throws Exception { @Test public void naturalIndexDescendingWithStart() throws Exception { CustomType1 first = pickLimit(); - testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null); + testIteration(NATURAL_ORDER, view().sorted().reverse().first(first.key), first, null); } @Test @@ -268,7 +270,7 @@ public void childIndexDescendingWithStart() throws Exception { @Test public void naturalIndexWithSkip() throws Exception { - testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null); + testIteration(NATURAL_ORDER, view().sorted().skip(pickCount()), null, null); } @Test @@ -290,7 +292,7 @@ public void childIndexWithSkip() throws Exception { @Test public void naturalIndexWithMax() throws Exception { - testIteration(NATURAL_ORDER, view().max(pickCount()), null, null); + testIteration(NATURAL_ORDER, view().sorted().max(pickCount()), null, null); } @Test @@ -308,7 +310,7 @@ public void childIndexWithMax() throws Exception { @Test public void naturalIndexWithLast() throws Exception { CustomType1 last = pickLimit(); - testIteration(NATURAL_ORDER, view().last(last.key), null, last); + testIteration(NATURAL_ORDER, view().sorted().last(last.key), null, last); } @Test @@ -339,7 +341,7 @@ public void childIndexWithLast() throws Exception { @Test public void naturalIndexDescendingWithLast() throws Exception { CustomType1 last = pickLimit(); - testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last); + testIteration(NATURAL_ORDER, view().sorted().reverse().last(last.key), null, last); } @Test diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java new file mode 100644 index 0000000000000..3f7f4299387ec --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class GenericArrayKeySuite { + + @Test + public void testGenericArrayKey() { + byte[] b1 = new byte[] { 0x01, 0x02, 0x03 }; + byte[] b2 = new byte[] { 0x01, 0x02 }; + int[] i1 = new int[] { 1, 2, 3 }; + int[] i2 = new int[] { 1, 2 }; + String[] s1 = new String[] { "1", "2", "3" }; + String[] s2 = new String[] { "1", "2" }; + + assertEquals(new GenericArrayKey(b1), new GenericArrayKey(b1)); + assertNotEquals(new GenericArrayKey(b1), new GenericArrayKey(b2)); + assertNotEquals(new GenericArrayKey(b1), new GenericArrayKey(i1)); + assertNotEquals(new GenericArrayKey(b1), new GenericArrayKey(s1)); + + assertEquals(new GenericArrayKey(i1), new GenericArrayKey(i1)); + assertNotEquals(new GenericArrayKey(i1), new GenericArrayKey(i2)); + assertNotEquals(new GenericArrayKey(i1), new GenericArrayKey(b1)); + assertNotEquals(new GenericArrayKey(i1), new GenericArrayKey(s1)); + + assertEquals(new GenericArrayKey(s1), new GenericArrayKey(s1)); + assertNotEquals(new GenericArrayKey(s1), new GenericArrayKey(s2)); + assertNotEquals(new GenericArrayKey(s1), new GenericArrayKey(b1)); + assertNotEquals(new GenericArrayKey(s1), new GenericArrayKey(i1)); + + assertEquals(0, new GenericArrayKey(b1).compareTo(new GenericArrayKey(b1))); + assertTrue(new GenericArrayKey(b1).compareTo(new GenericArrayKey(b2)) > 0); + + assertEquals(0, new GenericArrayKey(i1).compareTo(new GenericArrayKey(i1))); + assertTrue(new GenericArrayKey(i1).compareTo(new GenericArrayKey(i2)) > 0); + + assertEquals(0, new GenericArrayKey(s1).compareTo(new GenericArrayKey(s1))); + assertTrue(new GenericArrayKey(s1).compareTo(new GenericArrayKey(s2)) > 0); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java new file mode 100644 index 0000000000000..57ee4f6dd7cb6 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +public class InMemoryIteratorSuite extends DBIteratorSuite { + + @Override + protected KVStore createStore() { + return new InMemoryStore(); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java new file mode 100644 index 0000000000000..e9f00be8d4f03 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.NoSuchElementException; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class InMemoryStoreSuite { + + @Test + public void testObjectWriteReadDelete() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + + try { + store.read(CustomType1.class, t.key); + fail("Expected exception for non-existant object."); + } catch (NoSuchElementException nsee) { + // Expected. + } + + store.write(t); + assertEquals(t, store.read(t.getClass(), t.key)); + assertEquals(1L, store.count(t.getClass())); + + store.delete(t.getClass(), t.key); + try { + store.read(t.getClass(), t.key); + fail("Expected exception for deleted object."); + } catch (NoSuchElementException nsee) { + // Expected. + } + } + + @Test + public void testMultipleObjectWriteReadDelete() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t1 = new CustomType1(); + t1.key = "key1"; + t1.id = "id"; + t1.name = "name1"; + + CustomType1 t2 = new CustomType1(); + t2.key = "key2"; + t2.id = "id"; + t2.name = "name2"; + + store.write(t1); + store.write(t2); + + assertEquals(t1, store.read(t1.getClass(), t1.key)); + assertEquals(t2, store.read(t2.getClass(), t2.key)); + assertEquals(2L, store.count(t1.getClass())); + + store.delete(t1.getClass(), t1.key); + assertEquals(t2, store.read(t2.getClass(), t2.key)); + store.delete(t2.getClass(), t2.key); + try { + store.read(t2.getClass(), t2.key); + fail("Expected exception for deleted object."); + } catch (NoSuchElementException nsee) { + // Expected. + } + } + + @Test + public void testMetadata() throws Exception { + KVStore store = new InMemoryStore(); + assertNull(store.getMetadata(CustomType1.class)); + + CustomType1 t = new CustomType1(); + t.id = "id"; + t.name = "name"; + + store.setMetadata(t); + assertEquals(t, store.getMetadata(CustomType1.class)); + + store.setMetadata(null); + assertNull(store.getMetadata(CustomType1.class)); + } + + @Test + public void testUpdate() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + + store.write(t); + + t.name = "anotherName"; + + store.write(t); + assertEquals(1, store.count(t.getClass())); + assertSame(t, store.read(t.getClass(), t.key)); + } + + @Test + public void testArrayIndices() throws Exception { + KVStore store = new InMemoryStore(); + + ArrayKeyIndexType o = new ArrayKeyIndexType(); + o.key = new int[] { 1, 2 }; + o.id = new String[] { "3", "4" }; + + store.write(o); + assertEquals(o, store.read(ArrayKeyIndexType.class, o.key)); + assertEquals(o, store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next()); + } + + @Test + public void testBasicIteration() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t1 = new CustomType1(); + t1.key = "1"; + t1.id = "id1"; + t1.name = "name1"; + store.write(t1); + + CustomType1 t2 = new CustomType1(); + t2.key = "2"; + t2.id = "id2"; + t2.name = "name2"; + store.write(t2); + + assertEquals(t1.id, store.view(t1.getClass()).sorted().iterator().next().id); + assertEquals(t2.id, store.view(t1.getClass()).sorted().skip(1).iterator().next().id); + assertEquals(t2.id, store.view(t1.getClass()).sorted().skip(1).max(1).iterator().next().id); + assertEquals(t1.id, + store.view(t1.getClass()).sorted().first(t1.key).max(1).iterator().next().id); + assertEquals(t2.id, + store.view(t1.getClass()).sorted().first(t2.key).max(1).iterator().next().id); + assertFalse(store.view(t1.getClass()).sorted().first(t2.id).skip(1).iterator().hasNext()); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java index 5d40309ab203b..79bf4e833ff4e 100644 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java @@ -264,28 +264,4 @@ public int hashCode() { } - public static class ArrayKeyIndexType { - - @KVIndex - public int[] key; - - @KVIndex("id") - public String[] id; - - @Override - public boolean equals(Object o) { - if (o instanceof ArrayKeyIndexType) { - ArrayKeyIndexType other = (ArrayKeyIndexType) o; - return Arrays.equals(key, other.key) && Arrays.equals(id, other.id); - } - return false; - } - - @Override - public int hashCode() { - return key.hashCode(); - } - - } - } diff --git a/core/pom.xml b/core/pom.xml index 7f245b5b6384a..a98f5da2ff407 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -67,6 +67,11 @@ spark-launcher_${scala.binary.version} ${project.version} + + org.apache.spark + spark-kvstore_${scala.binary.version} + ${project.version} + org.apache.spark spark-network-common_${scala.binary.version} diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 1f89306403cd5..baa9363cf0902 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -188,7 +188,7 @@ $(document).ready(function() { } $(selector).DataTable(conf); - $('#hisotry-summary [data-toggle="tooltip"]').tooltip(); + $('#history-summary [data-toggle="tooltip"]').tooltip(); }); }); }); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 6d8758a3d3b1d..e25522a28c577 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -68,11 +68,19 @@ private[history] abstract class HistoryUpdateProbe { * @param ui Spark UI * @param updateProbe probe to call to check on the update state of this application attempt */ -private[history] case class LoadedAppUI( +private[spark] case class LoadedAppUI( ui: SparkUI, updateProbe: () => Boolean) -private[history] abstract class ApplicationHistoryProvider { +private[spark] abstract class ApplicationHistoryProvider { + + /** + * The number of applications available for listing. Separate method in case it's cheaper + * to get a count than to calculate the whole listing. + * + * @return The number of available applications. + */ + def getAppCount(): Int = getListing().size /** * Returns the count of application event logs that the provider is currently still processing. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f4235df245128..4c0252d8e991a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,14 +17,17 @@ package org.apache.spark.deploy.history -import java.io.{FileNotFoundException, IOException, OutputStream} -import java.util.UUID +import java.io.{File, FileNotFoundException, IOException, OutputStream} +import java.util.{Date, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -35,9 +38,12 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging +import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -78,6 +84,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) this(conf, new SystemClock()) } + import config._ import FsHistoryProvider._ // Interval between safemode checks. @@ -94,8 +101,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS, Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) - private val logDir = conf.getOption("spark.history.fs.logDirectory") - .getOrElse(DEFAULT_LOG_DIR) + private val logDir = conf.get(EVENT_LOG_DIR) private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false) private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "") @@ -117,17 +123,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] - = new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val listing = storePath.map { path => + val dbPath = new File(path, "listing.ldb") - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) + + try { + val db = openDB() + val meta = db.getMetadata(classOf[KVStoreMetadata]) + + if (meta == null) { + db.setMetadata(new KVStoreMetadata(CURRENT_VERSION, logDir.toString())) + db + } else if (meta.version != CURRENT_VERSION || !logDir.toString().equals(meta.logDir)) { + logInfo("Detected mismatched config in existing DB, deleting...") + db.close() + Utils.deleteRecursively(dbPath) + openDB() + } else { + db + } + } catch { + case _: UnsupportedStoreVersionException => + logInfo("Detected incompatible DB versions, deleting...") + Utils.deleteRecursively(dbPath) + openDB() + } + }.getOrElse(new InMemoryStore()) /** * Return a runnable that performs the given operation on the event logs. @@ -229,10 +255,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { + listing.view(classOf[ApplicationInfoWrapper]) + .index("endTime") + .reverse() + .iterator() + .asScala + .map(_.toAppHistoryInfo) + } - override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = { - applications.get(appId) + override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = { + try { + Some(load(appId).toAppHistoryInfo()) + } catch { + case e: NoSuchElementException => + None + } } override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() @@ -241,40 +279,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts + .find { attempt => attempt.info.attemptId == attemptId } + .map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) - } else { - None - } - + ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") + ui.getSecurityManager.setAdminAcls(adminAcls) + ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, + appListener.viewAcls.getOrElse("")) + val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + appListener.adminAclsGroups.getOrElse("") + ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) + ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) + LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) } - } } catch { - case e: FileNotFoundException => None + case _: NoSuchElementException => None } } @@ -299,6 +334,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { + listing.close() if (initThread != null && initThread.isAlive()) { initThread.interrupt() initThread.join() @@ -314,24 +350,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = statusList .filter { entry => - val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() && - SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) + SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && + recordedFileSize(entry.getPath()) < entry.getLen() } .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => - entry1.getModificationTime() >= entry2.getModificationTime() - } + entry1.getModificationTime() > entry2.getModificationTime() + } if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") @@ -419,205 +453,99 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - applications.get(appId) match { - case Some(appInfo) => - try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => - attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - zipFileToStream(logPath, attempt.logPath, zipStream) - } - } finally { - zipStream.close() + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + throw new SparkException(s"Logs for $appId not found.") + } + + try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId + .map { id => app.attempts.filter(_.info.attemptId == Some(id)) } + .getOrElse(app.attempts) + .map(_.logPath) + .foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") + } finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { - val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => - eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() - - val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. - if (appListener.appId.isDefined) { - val attemptInfo = new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - lastUpdated, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted, - fileStatus.getLen() - ) - fileToAppInfo(logPath) = attemptInfo - logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") - Some(attemptInfo) - } else { - logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") - None - } - - } catch { - case e: Exception => - logError( - s"Exception encountered when attempting to load application log ${fileStatus.getPath}", - e) - None + val eventsFilter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || + eventString.startsWith(APPL_END_EVENT_PREFIX) } - if (newAttempts.isEmpty) { - return - } - - // Build a map containing all apps that contain new attempts. The app information in this map - // contains both the new app attempt, and those that were already loaded in the existing apps - // map. If an attempt has been updated, it replaces the old attempt in the list. - val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() - - applications.synchronized { - newAttempts.foreach { attempt => - val appInfo = newAppMap.get(attempt.appId) - .orElse(applications.get(attempt.appId)) - .map { app => - val attempts = - app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt) - new FsApplicationHistoryInfo(attempt.appId, attempt.name, - attempts.sortWith(compareAttemptInfo)) - } - .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt))) - newAppMap(attempt.appId) = appInfo - } - - // Merge the new app list with the existing one, maintaining the expected ordering (descending - // end time). Maintaining the order is important to avoid having to sort the list every time - // there is a request for the log list. - val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) - val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { - if (!mergedApps.contains(info.id)) { - mergedApps += (info.id -> info) - } - } + val logPath = fileStatus.getPath() + logInfo(s"Replaying log path: $logPath") - val newIterator = newApps.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (newAppMap.contains(oldIterator.head.id)) { - oldIterator.next() - } else if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next()) - } else { - addIfAbsent(oldIterator.next()) - } - } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) + val bus = new ReplayListenerBus() + val listener = new AppListingListener(fileStatus, clock) + bus.addListener(listener) - applications = mergedApps - } + replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) + listener.applicationInfo.foreach(addListing) + listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } /** * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = { + var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None try { - val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 - - val now = clock.getTimeMillis() - val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - - def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { - now - attempt.lastUpdated > maxAge - } - - // Scan all logs from the log directory. - // Only completed applications older than the specified max age will be deleted. - applications.values.foreach { app => - val (toClean, toRetain) = app.attempts.partition(shouldClean) - attemptsToClean ++= toClean - - if (toClean.isEmpty) { - appsToRetain += (app.id -> app) - } else if (toRetain.nonEmpty) { - appsToRetain += (app.id -> - new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + + // Iterate descending over all applications whose oldest attempt is older than the maxAge. + iterator = Some(listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .reverse() + .first(maxTime) + .closeableIterator()) + + iterator.get.asScala.foreach { app => + val (remaining, toDelete) = app.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } + if (remaining.nonEmpty) { + val newApp = new ApplicationInfoWrapper(app.info, remaining) + listing.write(newApp) + } else { + listing.delete(app.getClass(), app.id) } - } - - applications = appsToRetain - val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] - attemptsToClean.foreach { attempt => - try { - fs.delete(new Path(logDir, attempt.logPath), true) - } catch { - case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") - case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - leftToClean += attempt + toDelete.foreach { attempt => + val logPath = new Path(logDir, attempt.logPath) + try { + listing.delete(classOf[LogInfo], logPath.toString()) + } catch { + case _: NoSuchElementException => + logDebug(s"Log info entry for $logPath not found.") + } + try { + fs.delete(logPath, true) + } catch { + case e: AccessControlException => + logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") + case t: IOException => + logError(s"IOException in cleaning ${attempt.logPath}", t) + } } } - - attemptsToClean = leftToClean } catch { - case t: Exception => logError("Exception in cleaning logs", t) + case t: Exception => logError("Exception while cleaning logs", t) + } finally { + iterator.foreach(_.close()) } } - /** - * Comparison function that defines the sort order for the application listing. - * - * @return Whether `i1` should precede `i2`. - */ - private def compareAppInfo( - i1: FsApplicationHistoryInfo, - i2: FsApplicationHistoryInfo): Boolean = { - val a1 = i1.attempts.head - val a2 = i2.attempts.head - if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime - } - - /** - * Comparison function that defines the sort order for application attempts within the same - * application. Order is: attempts are sorted by descending start time. - * Most recent attempt state matches with current state of the app. - * - * Normally applications should have a single running attempt; but failure to call sc.stop() - * may cause multiple running attempts to show up. - * - * @return Whether `a1` should precede `a2`. - */ - private def compareAttemptInfo( - a1: FsApplicationAttemptInfo, - a2: FsApplicationAttemptInfo): Boolean = { - a1.startTime >= a2.startTime - } - /** * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns * an `ApplicationEventListener` instance with event data captured from the replay. @@ -642,6 +570,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appListener = new ApplicationEventListener bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) + logInfo(s"Finished replaying $logPath") appListener } finally { logInput.close() @@ -678,26 +607,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * @return a summary of the component state */ override def toString: String = { - val header = s""" - | FsHistoryProvider: logdir=$logDir, - | last scan time=$lastScanTime - | Cached application count =${applications.size}} - """.stripMargin - val sb = new StringBuilder(header) - applications.foreach(entry => sb.append(entry._2).append("\n")) - sb.toString - } - - /** - * Look up an application attempt - * @param appId application ID - * @param attemptId Attempt ID, if set - * @return the matching attempt, if found - */ - def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId) - } + val count = listing.count(classOf[ApplicationInfoWrapper]) + s"""|FsHistoryProvider{logdir=$logDir, + | storedir=$storePath, + | last scan time=$lastScanTime + | application count=$count}""".stripMargin } /** @@ -715,19 +629,64 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { - lookup(appId, attemptId) match { - case None => - logDebug(s"Application Attempt $appId/$attemptId not found") - false - case Some(latest) => - prevFileSize < latest.fileSize + try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize + } catch { + case _: NoSuchElementException => false + } + } + + private def recordedFileSize(log: Path): Long = { + try { + listing.read(classOf[LogInfo], log.toString()).fileSize + } catch { + case _: NoSuchElementException => 0L + } + } + + private def load(appId: String): ApplicationInfoWrapper = { + listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { + val attempt = app.attempts.head + + val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) + } catch { + case _: NoSuchElementException => + app + } + + def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() } + + val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + val oldestAttempt = attempts.map(_.info.lastUpdated.getTime()).min + + val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) + listing.write(newAppInfo) + } + + /** For testing. Returns internal data about a single attempt. */ + private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { + load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( + throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) } + } private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" - private val NOT_STARTED = "" private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" @@ -735,52 +694,137 @@ private[history] object FsHistoryProvider { private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" + + private val CURRENT_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - * the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +case class KVStoreMetadata( + val version: Long, + val logDir: String) + +case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) + +private[history] class AttemptInfoWrapper( + val info: v1.ApplicationAttemptInfo, val logPath: String, - val name: String, - val appId: String, - attemptId: Option[String], - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - completed: Boolean, - val fileSize: Long) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { - s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" + val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { + ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( - id: String, - override val name: String, - override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( + val info: v1.ApplicationInfo, + val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { + ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { + new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + app.id = event.appId.orNull + app.name = event.appName + + attempt.attemptId = event.appAttemptId + attempt.startTime = new Date(event.time) + attempt.lastUpdated = new Date(clock.getTimeMillis()) + attempt.sparkUser = event.sparkUser + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + attempt.endTime = new Date(event.time) + attempt.lastUpdated = new Date(log.getModificationTime()) + attempt.duration = event.time - attempt.startTime.getTime() + attempt.completed = true + } + + def applicationInfo: Option[ApplicationInfoWrapper] = { + if (app.id != null) { + Some(app.toView(List(attempt.toView()))) + } else { + None + } + } + + private class MutableApplicationInfo { + var id: String = null + var name: String = null + var coresGranted: Option[Int] = None + var maxCores: Option[Int] = None + var coresPerExecutor: Option[Int] = None + var memoryPerExecutorMB: Option[Int] = None + + def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = { + val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, + memoryPerExecutorMB, Nil) + new ApplicationInfoWrapper(apiInfo, attempts) + } + + } + + private class MutableAttemptInfo(logPath: String, fileSize: Long) { + var attemptId: Option[String] = None + var startTime = new Date(-1) + var endTime = new Date(-1) + var lastUpdated = new Date(-1) + var duration = 0L + var sparkUser: String = null + var completed = false + + def toView(): AttemptInfoWrapper = { + val apiInfo = new v1.ApplicationAttemptInfo( + attemptId, + startTime, + endTime, + lastUpdated, + duration, + sparkUser, + completed) + new AttemptInfoWrapper( + apiInfo, + logPath, + fileSize) + } + + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d9c8fda99ef97..949b307820218 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -325,7 +325,7 @@ object HistoryServer extends Logging { } } - private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = { + private[spark] def getAttemptURI(appId: String, attemptId: Option[String]): String = { val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("") s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}" } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala new file mode 100644 index 0000000000000..6d4b3712ca5b4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") + .stringConf + .createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") + .stringConf + .createOptional + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 56d8e51732ffd..d66117410f2c5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -39,9 +39,21 @@ class ApplicationAttemptInfo private[spark]( val duration: Long, val sparkUser: String, val completed: Boolean = false) { - def getStartTimeEpoch: Long = startTime.getTime - def getEndTimeEpoch: Long = endTime.getTime - def getLastUpdatedEpoch: Long = lastUpdated.getTime + + def getStartTimeEpoch: Long = startTime.getTime + + def getEndTimeEpoch: Long = endTime.getTime + + def getLastUpdatedEpoch: Long = lastUpdated.getTime + + // These setter methods are here for Jackson, since different parts of the code try to create + // instances of this class from serialized JSON and fail if these are not present. + + private def setStartTimeEpoch(unused: Long): Unit = { } + + private def setEndTimeEpoch(unused: Long): Unit = { } + + private def setLastUpdatedEpoch(unused: Long): Unit = { } } class ExecutorStageSummary private[spark]( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 456158d41b93f..483bd648ddb87 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ @@ -55,6 +56,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc Utils.deleteRecursively(testDir) } + private def newProvider( + conf: SparkConf, + clock: Clock = null): FsHistoryProvider = { + if (clock == null) { + new FsHistoryProvider(conf) + } else { + new FsHistoryProvider(conf, clock) + } + } + /** Create a fake log file using the new log format used in Spark 1.3+ */ private def newLogFile( appId: String, @@ -69,7 +80,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("Parse application logs") { val clock = new ManualClock(12345678) - val provider = new FsHistoryProvider(createTestConf(), clock) + val provider = newProvider(createTestConf(), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -164,7 +175,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("history file is renamed from inprogress to completed") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -173,20 +184,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should endWith(EventLoggingListener.IN_PROGRESS) } logFile1.renameTo(newLogFile("app1", None, inProgress = false)) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should not endWith(EventLoggingListener.IN_PROGRESS) } } test("Parse logs that application is not started") { - val provider = new FsHistoryProvider((createTestConf())) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -198,7 +207,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-5582: empty log directory") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -214,7 +223,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("apps with multiple attempts with order") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, @@ -275,7 +284,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("log cleaner") { val maxAge = TimeUnit.SECONDS.toMillis(10) val clock = new ManualClock(maxAge / 2) - val provider = new FsHistoryProvider( + val provider = newProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) @@ -321,7 +330,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) val maxAge = TimeUnit.SECONDS.toMillis(40) val clock = new ManualClock(0) - val provider = new FsHistoryProvider( + val provider = newProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("inProgressApp1", None, inProgress = true) @@ -343,23 +352,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.checkForLogs() // This should not trigger any cleanup - updateAndCheck(provider)(list => list.size should be(2)) + updateAndCheck(provider) { list => + list.size should be(2) + } // Should trigger cleanup for first file but not second one clock.setTime(firstFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(1)) + updateAndCheck(provider) { list => + list.size should be(1) + } assert(!log1.exists()) assert(log2.exists()) // Should cleanup the second file as well. clock.setTime(secondFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(0)) + updateAndCheck(provider) { list => + list.size should be(0) + } assert(!log1.exists()) assert(!log2.exists()) } test("Event log copy") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) writeFile(log, true, None, @@ -394,7 +409,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-8372: new logs with no app ID are ignored") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) @@ -408,7 +423,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("provider correctly checks whether fs is in safe mode") { - val provider = spy(new FsHistoryProvider(createTestConf())) + val provider = spy(newProvider(createTestConf())) val dfs = mock(classOf[DistributedFileSystem]) // Asserts that safe mode is false because we can't really control the return value of the mock, // since the API is different between hadoop 1 and 2. @@ -480,7 +495,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerApplicationEnd(5L) ) - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) updateAndCheck(provider) { list => list.size should be (1) list(0).name should be ("real-app") @@ -497,7 +512,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var provider: FsHistoryProvider = null try { - provider = new FsHistoryProvider(conf) + provider = newProvider(conf) val log = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log, true, None, SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), @@ -625,7 +640,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } private def createTestConf(): SparkConf = { - new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) } private class SafeModeTestProvider(conf: SparkConf, clock: Clock) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 95acb9a54440f..4277b82faa277 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -43,6 +43,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.selenium.WebBrowser import org.apache.spark._ +import org.apache.spark.deploy.history.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.JobUIData import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -64,6 +65,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private val logDir = getTestResourcePath("spark-events") private val expRoot = getTestResourceFile("HistoryServerExpectations") + private val storeDir = Utils.createTempDir(namePrefix = "history") private var provider: FsHistoryProvider = null private var server: HistoryServer = null @@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -87,14 +90,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers def stop(): Unit = { server.stop() + server = null } before { - init() - } - - after{ - stop() + if (server == null) { + init() + } } val cases = Seq( @@ -296,6 +298,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -372,6 +375,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { + server.stop() implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -388,6 +392,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.update.interval", "1s") .set("spark.eventLog.enabled", "true") .set("spark.history.cache.window", "250ms") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .remove("spark.testing") val provider = new FsHistoryProvider(myConf) val securityManager = HistoryServer.createSecurityManager(myConf) @@ -413,8 +418,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - // stop the server with the old config, and start the new one - server.stop() server = new HistoryServer(myConf, provider, securityManager, 18080) server.initialize() server.bind() diff --git a/docs/monitoring.md b/docs/monitoring.md index 3e577c5f36778..6bbd3e45be54e 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -220,6 +220,13 @@ The history server can be configured as follows: Number of threads that will be used by history server to process event logs. + + spark.history.store.path + /var/lib/spark-history + + Local directory where history server will cache application history data. + + Note that in all of these UIs, the tables are sortable by clicking their headers, diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 6c0c3ebcaebf4..c71a71bbdd30b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -144,6 +144,7 @@ List buildClassPath(String appClassPath) throws IOException { if (prependClasses || isTesting) { String scala = getScalaVersion(); List projects = Arrays.asList( + "common/kvstore", "common/network-common", "common/network-shuffle", "common/network-yarn", @@ -162,7 +163,8 @@ List buildClassPath(String appClassPath) throws IOException { "sql/core", "sql/hive", "sql/hive-thriftserver", - "streaming" + "streaming", + "ui" ); if (prependClasses) { if (!isTesting) { diff --git a/pom.xml b/pom.xml index 6835ea14cd42b..ce17b2fb3f8f9 100644 --- a/pom.xml +++ b/pom.xml @@ -2038,6 +2038,7 @@ false false false + in-memory true true diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 89b0c7a3ab7b0..818d892c764ad 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -240,7 +240,8 @@ object SparkBuild extends PomBuild { javacOptions in Compile ++= Seq( "-encoding", "UTF-8", - "-source", javacJVMVersion.value + "-source", javacJVMVersion.value, + "-Xlint:unchecked" ), // This -target option cannot be set in the Compile configuration scope since `javadoc` doesn't // play nicely with it; see https://github.com/sbt/sbt/issues/355#issuecomment-3817629 for @@ -785,6 +786,7 @@ object TestSettings { javaOptions in Test += "-Dspark.ui.enabled=false", javaOptions in Test += "-Dspark.ui.showConsoleProgress=false", javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true", + javaOptions in Test += "-Dspark.ui.stateStoreType=in-memory", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=false", javaOptions in Test += "-Dderby.system.durability=test", javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 1f48d71cc7a2b..656c7614c2bfd 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -86,7 +86,7 @@ This file is divided into 3 sections: - +