From c4330652ba8a6d053828ba358deafe0ad87422ee Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 26 May 2017 11:52:08 -0700 Subject: [PATCH 1/3] SHS-NG M1: small fix in loop condition. --- .../test/java/org/apache/spark/kvstore/DBIteratorSuite.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java index 32489a2174c2..2b15e6e11234 100644 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java @@ -127,7 +127,9 @@ public void setup() throws Exception { // the same way the store is expected to. CustomType1 first = allEntries.get(0); clashingEntries = new ArrayList<>(); - for (int i = 0; i < RND.nextInt(MIN_ENTRIES) + 1; i++) { + + int clashCount = RND.nextInt(MIN_ENTRIES) + 1; + for (int i = 0; i < clashCount; i++) { CustomType1 t = new CustomType1(); t.key = "n-key" + (count + i); t.id = first.id; From 667baf29d10eac37687bb5a96aa05488908ffab1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 26 Apr 2017 11:37:43 -0700 Subject: [PATCH 2/3] SHS-NG M1.2: In-memory KVStore implementation. This change adds an in-memory implementation of KVStore that can be used by the live UI. The implementation is definitely not optimized, neither for speed nor space, but should be fast enough for using in the listener bus. The change slightly changes the semantics of KVStore views; now, by default, they don't guarantee ordering, which makes it faster to iterate over data in the in-memory store when trimming the number of elements (a feature that will be useful later on). The disk store by default still returns elements in sorted order. --- .../apache/spark/kvstore/GenericArrayKey.java | 93 +++++ .../apache/spark/kvstore/InMemoryStore.java | 331 ++++++++++++++++++ .../org/apache/spark/kvstore/KVStoreView.java | 16 +- .../org/apache/spark/kvstore/KVTypeInfo.java | 2 +- .../apache/spark/kvstore/LevelDBIterator.java | 2 +- .../spark/kvstore/ArrayKeyIndexType.java | 44 +++ .../apache/spark/kvstore/DBIteratorSuite.java | 16 +- .../spark/kvstore/GenericArrayKeySuite.java | 59 ++++ .../spark/kvstore/InMemoryIteratorSuite.java | 27 ++ .../spark/kvstore/InMemoryStoreSuite.java | 161 +++++++++ .../apache/spark/kvstore/LevelDBSuite.java | 24 -- pom.xml | 1 + project/SparkBuild.scala | 4 +- 13 files changed, 744 insertions(+), 36 deletions(-) create mode 100644 common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java create mode 100644 common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java create mode 100644 common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java create mode 100644 common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java create mode 100644 common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java create mode 100644 common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java new file mode 100644 index 000000000000..e5594686c83d --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/GenericArrayKey.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; + +import com.google.common.base.Objects; + +/** + * A class that wraps a generic array so that it can be used as a key in a map, sorted or not. + * + * The implementation of {@link #compareTo(GenericArrayKey)} makes two assumptions: + * - All elements are instances of Comparable + * - When comparing two arrays, they both contain elements of the same type in corresponding + * indices. + * + * Otherwise, ClassCastExceptions may occur. The equality method can compare any two arrays. + * + * This class is not efficient and is mostly meant to compare really small arrays, like those + * generally used as indices and keys in a KVStore. + */ +public class GenericArrayKey implements Comparable { + + private final Object key; + + public GenericArrayKey(Object key) { + this.key = key; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof GenericArrayKey)) { + return false; + } + + Object comp = ((GenericArrayKey) other).key; + int l1 = Array.getLength(key); + int l2 = Array.getLength(comp); + + if (l1 != l2) { + return false; + } + + for (int i = 0; i < l1; i++) { + if (!Objects.equal(Array.get(key, i), Array.get(comp, i))) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + int code = 0; + int length = Array.getLength(key); + for (int i = 0; i < length; i++) { + code += Array.get(key, i).hashCode(); + } + return 31 * code; + } + + @Override + @SuppressWarnings("unchecked") + public int compareTo(GenericArrayKey other) { + int len = Math.min(Array.getLength(key), Array.getLength(other.key)); + for (int i = 0; i < len; i++) { + int diff = ((Comparable) Array.get(key, i)).compareTo( + (Comparable) Array.get(other.key, i)); + if (diff != 0) { + return diff; + } + } + + return Array.getLength(key) - Array.getLength(other.key); + } + +} diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java new file mode 100644 index 000000000000..78b71a0539db --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +/** + * Implementation of KVStore that keeps data deserialized in memory. This store does not index + * data; instead, whenever iterating over an indexed field, the stored data is copied and sorted + * according to the index. This saves memory but makes iteration more expensive. + */ +public class InMemoryStore implements KVStore { + + private Object metadata; + private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + + @Override + public T getMetadata(Class klass) { + return klass.cast(metadata); + } + + @Override + public void setMetadata(Object value) { + this.metadata = value; + } + + @Override + public long count(Class type) { + InstanceList list = data.get(type); + return list != null ? list.size() : 0; + } + + @Override + public long count(Class type, String index, Object indexedValue) throws Exception { + InstanceList list = data.get(type); + int count = 0; + for (Object o : view(type)) { + if (Objects.equal(indexedValue, list.getIndexValue(o, index))) { + count++; + } + } + return count; + } + + @Override + public T read(Class klass, Object naturalKey) { + InstanceList list = data.get(klass); + Object value = list != null ? list.get(naturalKey) : null; + if (value == null) { + throw new NoSuchElementException(); + } + return klass.cast(value); + } + + @Override + public void write(Object value) throws Exception { + InstanceList list = data.computeIfAbsent(value.getClass(), key -> { + try { + return new InstanceList(key); + } catch (Exception e) { + throw Throwables.propagate(e); + } + }); + list.put(value); + } + + @Override + public void delete(Class type, Object naturalKey) { + InstanceList list = data.get(type); + if (list != null) { + list.delete(naturalKey); + } + } + + @Override + public KVStoreView view(Class type){ + InstanceList list = data.get(type); + return list != null ? list.view(type) + : new InMemoryView<>(type, Collections.emptyList(), null); + } + + @Override + public void close() { + metadata = null; + data.clear(); + } + + @SuppressWarnings("unchecked") + private static Comparable asKey(Object in) { + if (in.getClass().isArray()) { + in = new GenericArrayKey(in); + } + return (Comparable) in; + } + + private static class InstanceList { + + private final KVTypeInfo ti; + private final KVTypeInfo.Accessor naturalKey; + private final ConcurrentMap, Object> data; + + private int size; + + private InstanceList(Class type) throws Exception { + this.ti = new KVTypeInfo(type); + this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); + this.data = new ConcurrentHashMap<>(); + this.size = 0; + } + + Object getIndexValue(Object o, String indexName) throws Exception { + return ti.getAccessor(indexName).get(o); + } + + public Object get(Object key) { + return data.get(asKey(key)); + } + + public void put(Object value) throws Exception { + Preconditions.checkArgument(ti.type().equals(value.getClass()), + "Unexpected type: %s", value.getClass()); + if (data.put(asKey(naturalKey.get(value)), value) == null) { + size++; + } + } + + public void delete(Object key) { + if (data.remove(asKey(key)) != null) { + size--; + } + } + + public int size() { + return size; + } + + @SuppressWarnings("unchecked") + public InMemoryView view(Class type) { + Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); + Collection all = (Collection) data.values(); + return new InMemoryView(type, all, ti); + } + + } + + private static class InMemoryView extends KVStoreView { + + private final Collection elements; + private final KVTypeInfo ti; + private final KVTypeInfo.Accessor natural; + + InMemoryView(Class type, Collection elements, KVTypeInfo ti) { + super(type); + this.elements = elements; + this.ti = ti; + this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null; + } + + @Override + public Iterator iterator() { + if (elements.isEmpty()) { + return new InMemoryIterator<>(elements.iterator()); + } + + Preconditions.checkArgument(index != null || first == null, + "First requires sorting to be configured for the view."); + Preconditions.checkArgument(index != null || last == null, + "Last requires sorting to be configured for the view."); + + try { + KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null; + int modifier = ascending ? 1 : -1; + + final Collection iterable; + if (index != null) { + List copy = copyElements(); + Collections.sort(copy, (e1, e2) -> modifier * compare(e1, e2, getter)); + iterable = copy; + } else { + iterable = this.elements; + } + + Stream stream = iterable.stream(); + + if (first != null) { + stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0); + } + + if (last != null) { + stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0); + } + + if (skip > 0) { + stream = stream.skip(skip); + } + + if (max < iterable.size()) { + stream = stream.limit((int) max); + } + + return new InMemoryIterator<>(stream.iterator()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * Create a copy of the input elements, filtering the values for child indices if needed. + */ + private List copyElements() { + if (parent != null) { + KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); + Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); + + return elements.stream() + .filter(e -> compare(e, parentGetter, parent) == 0) + .collect(Collectors.toList()); + } else { + return new ArrayList<>(elements); + } + } + + private int compare(T e1, T e2, KVTypeInfo.Accessor getter) { + try { + int diff = compare(e1, getter, getter.get(e2)); + if (diff == 0 && getter != natural) { + diff = compare(e1, natural, natural.get(e2)); + } + return diff; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) { + try { + return asKey(getter.get(e1)).compareTo(asKey(v2)); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + } + + private static class InMemoryIterator implements KVStoreIterator { + + private final Iterator iter; + + InMemoryIterator(Iterator iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public T next() { + return iter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public List next(int max) { + List list = new ArrayList<>(max); + while (hasNext() && list.size() < max) { + list.add(next()); + } + return list; + } + + @Override + public boolean skip(long n) { + long skipped = 0; + while (skipped < n) { + if (hasNext()) { + next(); + skipped++; + } else { + return false; + } + } + + return true; + } + + @Override + public void close() { + // no op. + } + + } + +} diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java index cc5874e95887..ebf5e2cd4d7b 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java @@ -35,13 +35,20 @@ * when used in a for loop that exhausts their contents, but when used manually, they need * to be closed explicitly unless all elements are read. *

+ * + *

+ * By default, views do not have a pre-defined ordering. To define an order, call either + * {@link #sorted()} or {@link #index(String)} to define the desired order of elements. + * Some features, such as defining the first and last element to be returned, require + * an order to be defined. + *

*/ public abstract class KVStoreView implements Iterable { final Class type; boolean ascending = true; - String index = KVIndex.NATURAL_INDEX_NAME; + String index = null; Object first = null; Object last = null; Object parent = null; @@ -60,6 +67,13 @@ public KVStoreView reverse() { return this; } + /** + * Sorts the view according to the natural index. + */ + public KVStoreView sorted() { + return index(KVIndex.NATURAL_INDEX_NAME); + } + /** * Iterates according to the given index. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java index 19a559f90281..889d57718969 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java @@ -89,7 +89,7 @@ private void checkIndex(KVIndex idx, Map indices) { "Duplicate index %s for type %s.", idx.value(), type.getName()); } - public Class getType() { + public Class type() { return type; } diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java index 2143a0e87738..1a2696c7a1c1 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java @@ -52,7 +52,7 @@ class LevelDBIterator implements KVStoreIterator { this.it = db.db().iterator(); this.type = params.type; this.ti = db.getTypeInfo(type); - this.index = ti.index(params.index); + this.index = ti.index(params.index != null ? params.index : KVIndex.NATURAL_INDEX_NAME); this.max = params.max; Preconditions.checkArgument(!index.isChild() || params.parent != null, diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java new file mode 100644 index 000000000000..d5938acc3e80 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.Arrays; + +public class ArrayKeyIndexType { + + @KVIndex + public int[] key; + + @KVIndex("id") + public String[] id; + + @Override + public boolean equals(Object o) { + if (o instanceof ArrayKeyIndexType) { + ArrayKeyIndexType other = (ArrayKeyIndexType) o; + return Arrays.equals(key, other.key) && Arrays.equals(id, other.id); + } + return false; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java index 2b15e6e11234..6684bf367e2d 100644 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java @@ -156,7 +156,7 @@ public void setup() throws Exception { @Test public void naturalIndex() throws Exception { - testIteration(NATURAL_ORDER, view(), null, null); + testIteration(NATURAL_ORDER, view().sorted(), null, null); } @Test @@ -182,7 +182,7 @@ public void childIndex() throws Exception { @Test public void naturalIndexDescending() throws Exception { - testIteration(NATURAL_ORDER, view().reverse(), null, null); + testIteration(NATURAL_ORDER, view().sorted().reverse(), null, null); } @Test @@ -209,7 +209,7 @@ public void childIndexDescending() throws Exception { @Test public void naturalIndexWithStart() throws Exception { CustomType1 first = pickLimit(); - testIteration(NATURAL_ORDER, view().first(first.key), first, null); + testIteration(NATURAL_ORDER, view().sorted().first(first.key), first, null); } @Test @@ -240,7 +240,7 @@ public void childIndexWithStart() throws Exception { @Test public void naturalIndexDescendingWithStart() throws Exception { CustomType1 first = pickLimit(); - testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null); + testIteration(NATURAL_ORDER, view().sorted().reverse().first(first.key), first, null); } @Test @@ -270,7 +270,7 @@ public void childIndexDescendingWithStart() throws Exception { @Test public void naturalIndexWithSkip() throws Exception { - testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null); + testIteration(NATURAL_ORDER, view().sorted().skip(pickCount()), null, null); } @Test @@ -292,7 +292,7 @@ public void childIndexWithSkip() throws Exception { @Test public void naturalIndexWithMax() throws Exception { - testIteration(NATURAL_ORDER, view().max(pickCount()), null, null); + testIteration(NATURAL_ORDER, view().sorted().max(pickCount()), null, null); } @Test @@ -310,7 +310,7 @@ public void childIndexWithMax() throws Exception { @Test public void naturalIndexWithLast() throws Exception { CustomType1 last = pickLimit(); - testIteration(NATURAL_ORDER, view().last(last.key), null, last); + testIteration(NATURAL_ORDER, view().sorted().last(last.key), null, last); } @Test @@ -341,7 +341,7 @@ public void childIndexWithLast() throws Exception { @Test public void naturalIndexDescendingWithLast() throws Exception { CustomType1 last = pickLimit(); - testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last); + testIteration(NATURAL_ORDER, view().sorted().reverse().last(last.key), null, last); } @Test diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java new file mode 100644 index 000000000000..3f7f4299387e --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/GenericArrayKeySuite.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class GenericArrayKeySuite { + + @Test + public void testGenericArrayKey() { + byte[] b1 = new byte[] { 0x01, 0x02, 0x03 }; + byte[] b2 = new byte[] { 0x01, 0x02 }; + int[] i1 = new int[] { 1, 2, 3 }; + int[] i2 = new int[] { 1, 2 }; + String[] s1 = new String[] { "1", "2", "3" }; + String[] s2 = new String[] { "1", "2" }; + + assertEquals(new GenericArrayKey(b1), new GenericArrayKey(b1)); + assertNotEquals(new GenericArrayKey(b1), new GenericArrayKey(b2)); + assertNotEquals(new GenericArrayKey(b1), new GenericArrayKey(i1)); + assertNotEquals(new GenericArrayKey(b1), new GenericArrayKey(s1)); + + assertEquals(new GenericArrayKey(i1), new GenericArrayKey(i1)); + assertNotEquals(new GenericArrayKey(i1), new GenericArrayKey(i2)); + assertNotEquals(new GenericArrayKey(i1), new GenericArrayKey(b1)); + assertNotEquals(new GenericArrayKey(i1), new GenericArrayKey(s1)); + + assertEquals(new GenericArrayKey(s1), new GenericArrayKey(s1)); + assertNotEquals(new GenericArrayKey(s1), new GenericArrayKey(s2)); + assertNotEquals(new GenericArrayKey(s1), new GenericArrayKey(b1)); + assertNotEquals(new GenericArrayKey(s1), new GenericArrayKey(i1)); + + assertEquals(0, new GenericArrayKey(b1).compareTo(new GenericArrayKey(b1))); + assertTrue(new GenericArrayKey(b1).compareTo(new GenericArrayKey(b2)) > 0); + + assertEquals(0, new GenericArrayKey(i1).compareTo(new GenericArrayKey(i1))); + assertTrue(new GenericArrayKey(i1).compareTo(new GenericArrayKey(i2)) > 0); + + assertEquals(0, new GenericArrayKey(s1).compareTo(new GenericArrayKey(s1))); + assertTrue(new GenericArrayKey(s1).compareTo(new GenericArrayKey(s2)) > 0); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java new file mode 100644 index 000000000000..57ee4f6dd7cb --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +public class InMemoryIteratorSuite extends DBIteratorSuite { + + @Override + protected KVStore createStore() { + return new InMemoryStore(); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java new file mode 100644 index 000000000000..e9f00be8d4f0 --- /dev/null +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.NoSuchElementException; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class InMemoryStoreSuite { + + @Test + public void testObjectWriteReadDelete() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + + try { + store.read(CustomType1.class, t.key); + fail("Expected exception for non-existant object."); + } catch (NoSuchElementException nsee) { + // Expected. + } + + store.write(t); + assertEquals(t, store.read(t.getClass(), t.key)); + assertEquals(1L, store.count(t.getClass())); + + store.delete(t.getClass(), t.key); + try { + store.read(t.getClass(), t.key); + fail("Expected exception for deleted object."); + } catch (NoSuchElementException nsee) { + // Expected. + } + } + + @Test + public void testMultipleObjectWriteReadDelete() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t1 = new CustomType1(); + t1.key = "key1"; + t1.id = "id"; + t1.name = "name1"; + + CustomType1 t2 = new CustomType1(); + t2.key = "key2"; + t2.id = "id"; + t2.name = "name2"; + + store.write(t1); + store.write(t2); + + assertEquals(t1, store.read(t1.getClass(), t1.key)); + assertEquals(t2, store.read(t2.getClass(), t2.key)); + assertEquals(2L, store.count(t1.getClass())); + + store.delete(t1.getClass(), t1.key); + assertEquals(t2, store.read(t2.getClass(), t2.key)); + store.delete(t2.getClass(), t2.key); + try { + store.read(t2.getClass(), t2.key); + fail("Expected exception for deleted object."); + } catch (NoSuchElementException nsee) { + // Expected. + } + } + + @Test + public void testMetadata() throws Exception { + KVStore store = new InMemoryStore(); + assertNull(store.getMetadata(CustomType1.class)); + + CustomType1 t = new CustomType1(); + t.id = "id"; + t.name = "name"; + + store.setMetadata(t); + assertEquals(t, store.getMetadata(CustomType1.class)); + + store.setMetadata(null); + assertNull(store.getMetadata(CustomType1.class)); + } + + @Test + public void testUpdate() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t = new CustomType1(); + t.key = "key"; + t.id = "id"; + t.name = "name"; + + store.write(t); + + t.name = "anotherName"; + + store.write(t); + assertEquals(1, store.count(t.getClass())); + assertSame(t, store.read(t.getClass(), t.key)); + } + + @Test + public void testArrayIndices() throws Exception { + KVStore store = new InMemoryStore(); + + ArrayKeyIndexType o = new ArrayKeyIndexType(); + o.key = new int[] { 1, 2 }; + o.id = new String[] { "3", "4" }; + + store.write(o); + assertEquals(o, store.read(ArrayKeyIndexType.class, o.key)); + assertEquals(o, store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next()); + } + + @Test + public void testBasicIteration() throws Exception { + KVStore store = new InMemoryStore(); + + CustomType1 t1 = new CustomType1(); + t1.key = "1"; + t1.id = "id1"; + t1.name = "name1"; + store.write(t1); + + CustomType1 t2 = new CustomType1(); + t2.key = "2"; + t2.id = "id2"; + t2.name = "name2"; + store.write(t2); + + assertEquals(t1.id, store.view(t1.getClass()).sorted().iterator().next().id); + assertEquals(t2.id, store.view(t1.getClass()).sorted().skip(1).iterator().next().id); + assertEquals(t2.id, store.view(t1.getClass()).sorted().skip(1).max(1).iterator().next().id); + assertEquals(t1.id, + store.view(t1.getClass()).sorted().first(t1.key).max(1).iterator().next().id); + assertEquals(t2.id, + store.view(t1.getClass()).sorted().first(t2.key).max(1).iterator().next().id); + assertFalse(store.view(t1.getClass()).sorted().first(t2.id).skip(1).iterator().hasNext()); + } + +} diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java index 5d40309ab203..79bf4e833ff4 100644 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java @@ -264,28 +264,4 @@ public int hashCode() { } - public static class ArrayKeyIndexType { - - @KVIndex - public int[] key; - - @KVIndex("id") - public String[] id; - - @Override - public boolean equals(Object o) { - if (o instanceof ArrayKeyIndexType) { - ArrayKeyIndexType other = (ArrayKeyIndexType) o; - return Arrays.equals(key, other.key) && Arrays.equals(id, other.id); - } - return false; - } - - @Override - public int hashCode() { - return key.hashCode(); - } - - } - } diff --git a/pom.xml b/pom.xml index 6835ea14cd42..ce17b2fb3f8f 100644 --- a/pom.xml +++ b/pom.xml @@ -2038,6 +2038,7 @@ false false false + in-memory true true diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 89b0c7a3ab7b..818d892c764a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -240,7 +240,8 @@ object SparkBuild extends PomBuild { javacOptions in Compile ++= Seq( "-encoding", "UTF-8", - "-source", javacJVMVersion.value + "-source", javacJVMVersion.value, + "-Xlint:unchecked" ), // This -target option cannot be set in the Compile configuration scope since `javadoc` doesn't // play nicely with it; see https://github.com/sbt/sbt/issues/355#issuecomment-3817629 for @@ -785,6 +786,7 @@ object TestSettings { javaOptions in Test += "-Dspark.ui.enabled=false", javaOptions in Test += "-Dspark.ui.showConsoleProgress=false", javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true", + javaOptions in Test += "-Dspark.ui.stateStoreType=in-memory", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=false", javaOptions in Test += "-Dderby.system.durability=test", javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) From 2fa4f3f5f02728b7900618ab10c97a13ef0c2d62 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 13 Oct 2016 15:30:59 -0700 Subject: [PATCH 3/3] SHS-NG M2: Store FsHistoryProvider listing data in a KVStore. The application listing is still generated from event logs, but is now stored in a KVStore instance. By default an in-memory store is used, but a new config allows setting a local disk path to store the data, in which case a LevelDB store will be created. The provider stores things internally using the public REST API types; I believe this is better going forward since it will make it easier to get rid of the internal history server API which is mostly redundant at this point. I also added a finalizer to LevelDBIterator, to make sure that resources are eventually released. This helps when code iterates but does not exhaust the iterator, thus not triggering the auto-close code. HistoryServerSuite was modified to not re-start the history server unnecessarily; this makes the json validation tests run more quickly. --- .../apache/spark/kvstore/LevelDBIterator.java | 12 + core/pom.xml | 5 + .../org/apache/spark/ui/static/historypage.js | 2 +- .../history/ApplicationHistoryProvider.scala | 12 +- .../deploy/history/FsHistoryProvider.scala | 618 ++++++++++-------- .../spark/deploy/history/HistoryServer.scala | 2 +- .../apache/spark/deploy/history/config.scala | 46 ++ .../org/apache/spark/status/api/v1/api.scala | 18 +- .../history/FsHistoryProviderSuite.scala | 57 +- .../deploy/history/HistoryServerSuite.scala | 17 +- docs/monitoring.md | 7 + .../launcher/AbstractCommandBuilder.java | 4 +- scalastyle-config.xml | 2 +- 13 files changed, 479 insertions(+), 323 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/config.scala diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java index 1a2696c7a1c1..76945d044c68 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java @@ -172,6 +172,18 @@ public synchronized void close() throws IOException { } } + /** + * Because it's tricky to expose closeable iterators through many internal APIs, especially + * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by + * the iterator will eventually be released. + */ + @Override + protected void finalize() throws Throwable { + if (db.db() != null) { + close(); + } + } + private T loadNext() { if (count >= max) { return null; diff --git a/core/pom.xml b/core/pom.xml index 7f245b5b6384..a98f5da2ff40 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -67,6 +67,11 @@ spark-launcher_${scala.binary.version} ${project.version} + + org.apache.spark + spark-kvstore_${scala.binary.version} + ${project.version} + org.apache.spark spark-network-common_${scala.binary.version} diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 1f89306403cd..baa9363cf090 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -188,7 +188,7 @@ $(document).ready(function() { } $(selector).DataTable(conf); - $('#hisotry-summary [data-toggle="tooltip"]').tooltip(); + $('#history-summary [data-toggle="tooltip"]').tooltip(); }); }); }); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 6d8758a3d3b1..e25522a28c57 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -68,11 +68,19 @@ private[history] abstract class HistoryUpdateProbe { * @param ui Spark UI * @param updateProbe probe to call to check on the update state of this application attempt */ -private[history] case class LoadedAppUI( +private[spark] case class LoadedAppUI( ui: SparkUI, updateProbe: () => Boolean) -private[history] abstract class ApplicationHistoryProvider { +private[spark] abstract class ApplicationHistoryProvider { + + /** + * The number of applications available for listing. Separate method in case it's cheaper + * to get a count than to calculate the whole listing. + * + * @return The number of available applications. + */ + def getAppCount(): Int = getListing().size /** * Returns the count of application event logs that the provider is currently still processing. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f4235df24512..4c0252d8e991 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,14 +17,17 @@ package org.apache.spark.deploy.history -import java.io.{FileNotFoundException, IOException, OutputStream} -import java.util.UUID +import java.io.{File, FileNotFoundException, IOException, OutputStream} +import java.util.{Date, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -35,9 +38,12 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging +import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -78,6 +84,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) this(conf, new SystemClock()) } + import config._ import FsHistoryProvider._ // Interval between safemode checks. @@ -94,8 +101,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS, Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) - private val logDir = conf.getOption("spark.history.fs.logDirectory") - .getOrElse(DEFAULT_LOG_DIR) + private val logDir = conf.get(EVENT_LOG_DIR) private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false) private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "") @@ -117,17 +123,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] - = new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val listing = storePath.map { path => + val dbPath = new File(path, "listing.ldb") - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) + + try { + val db = openDB() + val meta = db.getMetadata(classOf[KVStoreMetadata]) + + if (meta == null) { + db.setMetadata(new KVStoreMetadata(CURRENT_VERSION, logDir.toString())) + db + } else if (meta.version != CURRENT_VERSION || !logDir.toString().equals(meta.logDir)) { + logInfo("Detected mismatched config in existing DB, deleting...") + db.close() + Utils.deleteRecursively(dbPath) + openDB() + } else { + db + } + } catch { + case _: UnsupportedStoreVersionException => + logInfo("Detected incompatible DB versions, deleting...") + Utils.deleteRecursively(dbPath) + openDB() + } + }.getOrElse(new InMemoryStore()) /** * Return a runnable that performs the given operation on the event logs. @@ -229,10 +255,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { + listing.view(classOf[ApplicationInfoWrapper]) + .index("endTime") + .reverse() + .iterator() + .asScala + .map(_.toAppHistoryInfo) + } - override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = { - applications.get(appId) + override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = { + try { + Some(load(appId).toAppHistoryInfo()) + } catch { + case e: NoSuchElementException => + None + } } override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() @@ -241,40 +279,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts + .find { attempt => attempt.info.attemptId == attemptId } + .map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) - } else { - None - } - + ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") + ui.getSecurityManager.setAdminAcls(adminAcls) + ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, + appListener.viewAcls.getOrElse("")) + val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + appListener.adminAclsGroups.getOrElse("") + ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) + ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) + LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) } - } } catch { - case e: FileNotFoundException => None + case _: NoSuchElementException => None } } @@ -299,6 +334,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { + listing.close() if (initThread != null && initThread.isAlive()) { initThread.interrupt() initThread.join() @@ -314,24 +350,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = statusList .filter { entry => - val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() && - SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) + SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && + recordedFileSize(entry.getPath()) < entry.getLen() } .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => - entry1.getModificationTime() >= entry2.getModificationTime() - } + entry1.getModificationTime() > entry2.getModificationTime() + } if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") @@ -419,205 +453,99 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - applications.get(appId) match { - case Some(appInfo) => - try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => - attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - zipFileToStream(logPath, attempt.logPath, zipStream) - } - } finally { - zipStream.close() + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + throw new SparkException(s"Logs for $appId not found.") + } + + try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId + .map { id => app.attempts.filter(_.info.attemptId == Some(id)) } + .getOrElse(app.attempts) + .map(_.logPath) + .foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") + } finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { - val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => - eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() - - val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. - if (appListener.appId.isDefined) { - val attemptInfo = new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - lastUpdated, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted, - fileStatus.getLen() - ) - fileToAppInfo(logPath) = attemptInfo - logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") - Some(attemptInfo) - } else { - logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") - None - } - - } catch { - case e: Exception => - logError( - s"Exception encountered when attempting to load application log ${fileStatus.getPath}", - e) - None + val eventsFilter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || + eventString.startsWith(APPL_END_EVENT_PREFIX) } - if (newAttempts.isEmpty) { - return - } - - // Build a map containing all apps that contain new attempts. The app information in this map - // contains both the new app attempt, and those that were already loaded in the existing apps - // map. If an attempt has been updated, it replaces the old attempt in the list. - val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() - - applications.synchronized { - newAttempts.foreach { attempt => - val appInfo = newAppMap.get(attempt.appId) - .orElse(applications.get(attempt.appId)) - .map { app => - val attempts = - app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt) - new FsApplicationHistoryInfo(attempt.appId, attempt.name, - attempts.sortWith(compareAttemptInfo)) - } - .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt))) - newAppMap(attempt.appId) = appInfo - } - - // Merge the new app list with the existing one, maintaining the expected ordering (descending - // end time). Maintaining the order is important to avoid having to sort the list every time - // there is a request for the log list. - val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) - val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { - if (!mergedApps.contains(info.id)) { - mergedApps += (info.id -> info) - } - } + val logPath = fileStatus.getPath() + logInfo(s"Replaying log path: $logPath") - val newIterator = newApps.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (newAppMap.contains(oldIterator.head.id)) { - oldIterator.next() - } else if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next()) - } else { - addIfAbsent(oldIterator.next()) - } - } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) + val bus = new ReplayListenerBus() + val listener = new AppListingListener(fileStatus, clock) + bus.addListener(listener) - applications = mergedApps - } + replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) + listener.applicationInfo.foreach(addListing) + listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } /** * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = { + var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None try { - val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 - - val now = clock.getTimeMillis() - val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - - def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { - now - attempt.lastUpdated > maxAge - } - - // Scan all logs from the log directory. - // Only completed applications older than the specified max age will be deleted. - applications.values.foreach { app => - val (toClean, toRetain) = app.attempts.partition(shouldClean) - attemptsToClean ++= toClean - - if (toClean.isEmpty) { - appsToRetain += (app.id -> app) - } else if (toRetain.nonEmpty) { - appsToRetain += (app.id -> - new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + + // Iterate descending over all applications whose oldest attempt is older than the maxAge. + iterator = Some(listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .reverse() + .first(maxTime) + .closeableIterator()) + + iterator.get.asScala.foreach { app => + val (remaining, toDelete) = app.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } + if (remaining.nonEmpty) { + val newApp = new ApplicationInfoWrapper(app.info, remaining) + listing.write(newApp) + } else { + listing.delete(app.getClass(), app.id) } - } - - applications = appsToRetain - val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] - attemptsToClean.foreach { attempt => - try { - fs.delete(new Path(logDir, attempt.logPath), true) - } catch { - case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") - case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - leftToClean += attempt + toDelete.foreach { attempt => + val logPath = new Path(logDir, attempt.logPath) + try { + listing.delete(classOf[LogInfo], logPath.toString()) + } catch { + case _: NoSuchElementException => + logDebug(s"Log info entry for $logPath not found.") + } + try { + fs.delete(logPath, true) + } catch { + case e: AccessControlException => + logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") + case t: IOException => + logError(s"IOException in cleaning ${attempt.logPath}", t) + } } } - - attemptsToClean = leftToClean } catch { - case t: Exception => logError("Exception in cleaning logs", t) + case t: Exception => logError("Exception while cleaning logs", t) + } finally { + iterator.foreach(_.close()) } } - /** - * Comparison function that defines the sort order for the application listing. - * - * @return Whether `i1` should precede `i2`. - */ - private def compareAppInfo( - i1: FsApplicationHistoryInfo, - i2: FsApplicationHistoryInfo): Boolean = { - val a1 = i1.attempts.head - val a2 = i2.attempts.head - if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime - } - - /** - * Comparison function that defines the sort order for application attempts within the same - * application. Order is: attempts are sorted by descending start time. - * Most recent attempt state matches with current state of the app. - * - * Normally applications should have a single running attempt; but failure to call sc.stop() - * may cause multiple running attempts to show up. - * - * @return Whether `a1` should precede `a2`. - */ - private def compareAttemptInfo( - a1: FsApplicationAttemptInfo, - a2: FsApplicationAttemptInfo): Boolean = { - a1.startTime >= a2.startTime - } - /** * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns * an `ApplicationEventListener` instance with event data captured from the replay. @@ -642,6 +570,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appListener = new ApplicationEventListener bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) + logInfo(s"Finished replaying $logPath") appListener } finally { logInput.close() @@ -678,26 +607,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * @return a summary of the component state */ override def toString: String = { - val header = s""" - | FsHistoryProvider: logdir=$logDir, - | last scan time=$lastScanTime - | Cached application count =${applications.size}} - """.stripMargin - val sb = new StringBuilder(header) - applications.foreach(entry => sb.append(entry._2).append("\n")) - sb.toString - } - - /** - * Look up an application attempt - * @param appId application ID - * @param attemptId Attempt ID, if set - * @return the matching attempt, if found - */ - def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId) - } + val count = listing.count(classOf[ApplicationInfoWrapper]) + s"""|FsHistoryProvider{logdir=$logDir, + | storedir=$storePath, + | last scan time=$lastScanTime + | application count=$count}""".stripMargin } /** @@ -715,19 +629,64 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { - lookup(appId, attemptId) match { - case None => - logDebug(s"Application Attempt $appId/$attemptId not found") - false - case Some(latest) => - prevFileSize < latest.fileSize + try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize + } catch { + case _: NoSuchElementException => false + } + } + + private def recordedFileSize(log: Path): Long = { + try { + listing.read(classOf[LogInfo], log.toString()).fileSize + } catch { + case _: NoSuchElementException => 0L + } + } + + private def load(appId: String): ApplicationInfoWrapper = { + listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { + val attempt = app.attempts.head + + val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) + } catch { + case _: NoSuchElementException => + app + } + + def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() } + + val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + val oldestAttempt = attempts.map(_.info.lastUpdated.getTime()).min + + val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) + listing.write(newAppInfo) + } + + /** For testing. Returns internal data about a single attempt. */ + private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { + load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( + throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) } + } private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" - private val NOT_STARTED = "" private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" @@ -735,52 +694,137 @@ private[history] object FsHistoryProvider { private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" + + private val CURRENT_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - * the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +case class KVStoreMetadata( + val version: Long, + val logDir: String) + +case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) + +private[history] class AttemptInfoWrapper( + val info: v1.ApplicationAttemptInfo, val logPath: String, - val name: String, - val appId: String, - attemptId: Option[String], - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - completed: Boolean, - val fileSize: Long) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { - s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" + val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { + ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( - id: String, - override val name: String, - override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( + val info: v1.ApplicationInfo, + val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { + ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { + new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + app.id = event.appId.orNull + app.name = event.appName + + attempt.attemptId = event.appAttemptId + attempt.startTime = new Date(event.time) + attempt.lastUpdated = new Date(clock.getTimeMillis()) + attempt.sparkUser = event.sparkUser + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + attempt.endTime = new Date(event.time) + attempt.lastUpdated = new Date(log.getModificationTime()) + attempt.duration = event.time - attempt.startTime.getTime() + attempt.completed = true + } + + def applicationInfo: Option[ApplicationInfoWrapper] = { + if (app.id != null) { + Some(app.toView(List(attempt.toView()))) + } else { + None + } + } + + private class MutableApplicationInfo { + var id: String = null + var name: String = null + var coresGranted: Option[Int] = None + var maxCores: Option[Int] = None + var coresPerExecutor: Option[Int] = None + var memoryPerExecutorMB: Option[Int] = None + + def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = { + val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, + memoryPerExecutorMB, Nil) + new ApplicationInfoWrapper(apiInfo, attempts) + } + + } + + private class MutableAttemptInfo(logPath: String, fileSize: Long) { + var attemptId: Option[String] = None + var startTime = new Date(-1) + var endTime = new Date(-1) + var lastUpdated = new Date(-1) + var duration = 0L + var sparkUser: String = null + var completed = false + + def toView(): AttemptInfoWrapper = { + val apiInfo = new v1.ApplicationAttemptInfo( + attemptId, + startTime, + endTime, + lastUpdated, + duration, + sparkUser, + completed) + new AttemptInfoWrapper( + apiInfo, + logPath, + fileSize) + } + + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d9c8fda99ef9..949b30782021 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -325,7 +325,7 @@ object HistoryServer extends Logging { } } - private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = { + private[spark] def getAttemptURI(appId: String, attemptId: Option[String]): String = { val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("") s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}" } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala new file mode 100644 index 000000000000..6d4b3712ca5b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") + .stringConf + .createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") + .stringConf + .createOptional + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 56d8e51732ff..d66117410f2c 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -39,9 +39,21 @@ class ApplicationAttemptInfo private[spark]( val duration: Long, val sparkUser: String, val completed: Boolean = false) { - def getStartTimeEpoch: Long = startTime.getTime - def getEndTimeEpoch: Long = endTime.getTime - def getLastUpdatedEpoch: Long = lastUpdated.getTime + + def getStartTimeEpoch: Long = startTime.getTime + + def getEndTimeEpoch: Long = endTime.getTime + + def getLastUpdatedEpoch: Long = lastUpdated.getTime + + // These setter methods are here for Jackson, since different parts of the code try to create + // instances of this class from serialized JSON and fail if these are not present. + + private def setStartTimeEpoch(unused: Long): Unit = { } + + private def setEndTimeEpoch(unused: Long): Unit = { } + + private def setLastUpdatedEpoch(unused: Long): Unit = { } } class ExecutorStageSummary private[spark]( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 456158d41b93..483bd648ddb8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ @@ -55,6 +56,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc Utils.deleteRecursively(testDir) } + private def newProvider( + conf: SparkConf, + clock: Clock = null): FsHistoryProvider = { + if (clock == null) { + new FsHistoryProvider(conf) + } else { + new FsHistoryProvider(conf, clock) + } + } + /** Create a fake log file using the new log format used in Spark 1.3+ */ private def newLogFile( appId: String, @@ -69,7 +80,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("Parse application logs") { val clock = new ManualClock(12345678) - val provider = new FsHistoryProvider(createTestConf(), clock) + val provider = newProvider(createTestConf(), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -164,7 +175,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("history file is renamed from inprogress to completed") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -173,20 +184,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should endWith(EventLoggingListener.IN_PROGRESS) } logFile1.renameTo(newLogFile("app1", None, inProgress = false)) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should not endWith(EventLoggingListener.IN_PROGRESS) } } test("Parse logs that application is not started") { - val provider = new FsHistoryProvider((createTestConf())) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -198,7 +207,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-5582: empty log directory") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -214,7 +223,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("apps with multiple attempts with order") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, @@ -275,7 +284,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("log cleaner") { val maxAge = TimeUnit.SECONDS.toMillis(10) val clock = new ManualClock(maxAge / 2) - val provider = new FsHistoryProvider( + val provider = newProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) @@ -321,7 +330,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) val maxAge = TimeUnit.SECONDS.toMillis(40) val clock = new ManualClock(0) - val provider = new FsHistoryProvider( + val provider = newProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("inProgressApp1", None, inProgress = true) @@ -343,23 +352,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.checkForLogs() // This should not trigger any cleanup - updateAndCheck(provider)(list => list.size should be(2)) + updateAndCheck(provider) { list => + list.size should be(2) + } // Should trigger cleanup for first file but not second one clock.setTime(firstFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(1)) + updateAndCheck(provider) { list => + list.size should be(1) + } assert(!log1.exists()) assert(log2.exists()) // Should cleanup the second file as well. clock.setTime(secondFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(0)) + updateAndCheck(provider) { list => + list.size should be(0) + } assert(!log1.exists()) assert(!log2.exists()) } test("Event log copy") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) writeFile(log, true, None, @@ -394,7 +409,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-8372: new logs with no app ID are ignored") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) @@ -408,7 +423,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("provider correctly checks whether fs is in safe mode") { - val provider = spy(new FsHistoryProvider(createTestConf())) + val provider = spy(newProvider(createTestConf())) val dfs = mock(classOf[DistributedFileSystem]) // Asserts that safe mode is false because we can't really control the return value of the mock, // since the API is different between hadoop 1 and 2. @@ -480,7 +495,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerApplicationEnd(5L) ) - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) updateAndCheck(provider) { list => list.size should be (1) list(0).name should be ("real-app") @@ -497,7 +512,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var provider: FsHistoryProvider = null try { - provider = new FsHistoryProvider(conf) + provider = newProvider(conf) val log = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log, true, None, SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), @@ -625,7 +640,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } private def createTestConf(): SparkConf = { - new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) } private class SafeModeTestProvider(conf: SparkConf, clock: Clock) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 95acb9a54440..4277b82faa27 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -43,6 +43,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.selenium.WebBrowser import org.apache.spark._ +import org.apache.spark.deploy.history.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.JobUIData import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -64,6 +65,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private val logDir = getTestResourcePath("spark-events") private val expRoot = getTestResourceFile("HistoryServerExpectations") + private val storeDir = Utils.createTempDir(namePrefix = "history") private var provider: FsHistoryProvider = null private var server: HistoryServer = null @@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -87,14 +90,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers def stop(): Unit = { server.stop() + server = null } before { - init() - } - - after{ - stop() + if (server == null) { + init() + } } val cases = Seq( @@ -296,6 +298,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -372,6 +375,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { + server.stop() implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -388,6 +392,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.update.interval", "1s") .set("spark.eventLog.enabled", "true") .set("spark.history.cache.window", "250ms") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .remove("spark.testing") val provider = new FsHistoryProvider(myConf) val securityManager = HistoryServer.createSecurityManager(myConf) @@ -413,8 +418,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - // stop the server with the old config, and start the new one - server.stop() server = new HistoryServer(myConf, provider, securityManager, 18080) server.initialize() server.bind() diff --git a/docs/monitoring.md b/docs/monitoring.md index 3e577c5f3677..6bbd3e45be54 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -220,6 +220,13 @@ The history server can be configured as follows: Number of threads that will be used by history server to process event logs. + + spark.history.store.path + /var/lib/spark-history + + Local directory where history server will cache application history data. + + Note that in all of these UIs, the tables are sortable by clicking their headers, diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 6c0c3ebcaebf..c71a71bbdd30 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -144,6 +144,7 @@ List buildClassPath(String appClassPath) throws IOException { if (prependClasses || isTesting) { String scala = getScalaVersion(); List projects = Arrays.asList( + "common/kvstore", "common/network-common", "common/network-shuffle", "common/network-yarn", @@ -162,7 +163,8 @@ List buildClassPath(String appClassPath) throws IOException { "sql/core", "sql/hive", "sql/hive-thriftserver", - "streaming" + "streaming", + "ui" ); if (prependClasses) { if (!isTesting) { diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 1f48d71cc7a2..656c7614c2bf 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -86,7 +86,7 @@ This file is divided into 3 sections: - +