Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.util.kvstore;

import java.io.Serializable;
import java.util.Arrays;

import com.google.common.base.Preconditions;
Expand All @@ -34,7 +35,7 @@
* This class is not efficient and is mostly meant to compare really small arrays, like those
* generally used as indices and keys in a KVStore.
*/
class ArrayWrappers {
class ArrayWrappers implements Serializable {

@SuppressWarnings("unchecked")
public static Comparable<Object> forArray(Object a) {
Expand All @@ -53,7 +54,7 @@ public static Comparable<Object> forArray(Object a) {
return (Comparable<Object>) ret;
}

private static class ComparableIntArray implements Comparable<ComparableIntArray> {
private static class ComparableIntArray implements Comparable<ComparableIntArray>, Serializable {

private final int[] array;

Expand Down Expand Up @@ -92,7 +93,8 @@ public int compareTo(ComparableIntArray other) {
}
}

private static class ComparableLongArray implements Comparable<ComparableLongArray> {
private static class ComparableLongArray
implements Comparable<ComparableLongArray>, Serializable {

private final long[] array;

Expand Down Expand Up @@ -131,7 +133,8 @@ public int compareTo(ComparableLongArray other) {
}
}

private static class ComparableByteArray implements Comparable<ComparableByteArray> {
private static class ComparableByteArray
implements Comparable<ComparableByteArray>, Serializable {

private final byte[] array;

Expand Down Expand Up @@ -170,7 +173,8 @@ public int compareTo(ComparableByteArray other) {
}
}

private static class ComparableObjectArray implements Comparable<ComparableObjectArray> {
private static class ComparableObjectArray
implements Comparable<ComparableObjectArray>, Serializable {

private final Object[] array;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@

package org.apache.spark.util.kvstore;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
Expand All @@ -42,7 +37,7 @@
* according to the index. This saves memory but makes iteration more expensive.
*/
@Private
public class InMemoryStore implements KVStore {
public class InMemoryStore implements KVStore, Serializable {

private Object metadata;
private InMemoryLists inMemoryLists = new InMemoryLists();
Expand Down Expand Up @@ -143,7 +138,7 @@ private static <T> KVStoreView<T> emptyView() {
* Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a
* class of type T to an InstanceList of type T.
*/
private static class InMemoryLists {
private static class InMemoryLists implements Serializable {
private final ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();

@SuppressWarnings("unchecked")
Expand All @@ -163,7 +158,7 @@ public void clear() {
}
}

private static class InstanceList<T> {
private static class InstanceList<T> implements Serializable {

/**
* A BiConsumer to control multi-entity removal. We use this in a forEach rather than an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.util.kvstore;

import java.io.IOException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
Expand All @@ -31,7 +35,7 @@
* Wrapper around types managed in a KVStore, providing easy access to their indexed fields.
*/
@Private
public class KVTypeInfo {
public class KVTypeInfo implements Serializable {

private final Class<?> type;
private final Map<String, KVIndex> indices;
Expand Down Expand Up @@ -120,7 +124,7 @@ Accessor getParentAccessor(String indexName) {
/**
* Abstracts the difference between invoking a Field and a Method.
*/
interface Accessor {
interface Accessor extends Serializable {

Object get(Object instance) throws ReflectiveOperationException;

Expand All @@ -129,7 +133,7 @@ interface Accessor {

private class FieldAccessor implements Accessor {

private final Field field;
private Field field;

FieldAccessor(Field field) {
this.field = field;
Expand All @@ -144,11 +148,24 @@ public Object get(Object instance) throws ReflectiveOperationException {
public Class getType() {
return field.getType();
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeUTF(field.getName());
out.writeObject(field.getDeclaringClass());
}

private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException, NoSuchFieldException {
String name = in.readUTF();
Class<?> clazz = (Class<?>) in.readObject();
field = clazz.getDeclaredField(name);
field.setAccessible(true);
}
}

private class MethodAccessor implements Accessor {

private final Method method;
private Method method;

MethodAccessor(Method method) {
this.method = method;
Expand All @@ -163,6 +180,21 @@ public Object get(Object instance) throws ReflectiveOperationException {
public Class getType() {
return method.getReturnType();
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeUTF(method.getName());
out.writeObject(method.getDeclaringClass());
out.writeObject(method.getParameterTypes());
}

private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException, NoSuchMethodException {
String name = in.readUTF();
Class<?> clazz = (Class<?>) in.readObject();
Class<?>[] parameters = (Class<?>[]) in.readObject();
method = clazz.getDeclaredMethod(name, parameters);
method.setAccessible(true);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.io._
import java.nio.file.Files
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
Expand Down Expand Up @@ -50,6 +50,7 @@ import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
Expand Down Expand Up @@ -131,6 +132,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)

// a JavaSerializer used to deserialize InMemoryStoreSnapshot
val serializer = new JavaSerializer(conf).newInstance()

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile()
Expand Down Expand Up @@ -441,6 +445,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
!entry.getPath().getName().endsWith(EventLoggingListener.CHECKPOINT) &&
!entry.getPath().getName().endsWith(s"${EventLoggingListener.CHECKPOINT}.tmp") &&
!isBlacklisted(entry.getPath)
}
.filter { entry =>
Expand Down Expand Up @@ -950,14 +956,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def rebuildAppStore(
store: KVStore,
eventLog: FileStatus,
lastUpdated: Long): Unit = {
lastUpdated: Long,
eventSkipNum: Int = 0): Unit = {
// Disable async updates, since they cause higher memory usage, and it's ok to take longer
// to parse the event logs in the SHS.
val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
val trackingStore = new ElementTrackingStore(store, replayConf)
val replayBus = new ReplayListenerBus()
val replayBus = new ReplayListenerBus(eventSkipNum)
val listener = new AppStatusListener(trackingStore, replayConf, false,
lastUpdateTime = Some(lastUpdated))
lastUpdateTime = Some(lastUpdated), initLiveEntitiesFromStore = eventSkipNum > 0)
replayBus.addListener(listener)

for {
Expand Down Expand Up @@ -1084,10 +1091,47 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
val store = new InMemoryStore()
val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
store
val imsSnapshot = getOrCreateInMemoryStoreSnapshot(attempt)
val store = imsSnapshot.store
if (imsSnapshot.finished) {
store
} else {
val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
rebuildAppStore(store, status, attempt.info.lastUpdated.getTime(), imsSnapshot.eventsNum)
store
}
}

private def getOrCreateInMemoryStoreSnapshot(attempt: AttemptInfoWrapper)
: InMemoryStoreSnapshot = {
if (conf.get(IMS_CHECKPOINT_ENABLED)) {
val ckpPath = new Path(logDir, attempt.logPath + EventLoggingListener.CHECKPOINT)
if (fs.exists(ckpPath)) {
try {
logInfo(s"Loading InMemoryStore checkpoint file: $ckpPath")
Utils.tryWithResource(EventLoggingListener.openEventLog(ckpPath, fs)) { in =>
val objIn = serializer.deserializeStream(in)
val startNs = System.nanoTime()
val imsSnapshot = objIn.readObject[InMemoryStoreSnapshot]()
objIn.close()
val finishedNs = System.nanoTime()
val duration = TimeUnit.NANOSECONDS.toMillis(finishedNs - startNs)
logInfo(s"Loaded InMemoryStore, eventsNum=${imsSnapshot.eventsNum}, " +
s"finished=${imsSnapshot.finished}, took ${duration}ms")
// +1 for skipping SparkListenerLogStart
imsSnapshot.copy(eventsNum = imsSnapshot.eventsNum + 1)
}
} catch {
case e: Exception =>
logError("Failed to load InMemoryStore checkpoint file, use log file instead", e)
InMemoryStoreSnapshot(new InMemoryStore, 0, false)
}
} else {
InMemoryStoreSnapshot(new InMemoryStore, 0, false)
}
} else {
InMemoryStoreSnapshot(new InMemoryStore, 0, false)
}
}

private def loadPlugins(): Iterable[AppHistoryServerPlugin] = {
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Status.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,30 @@ package org.apache.spark.internal.config

import java.util.concurrent.TimeUnit

import org.apache.spark.network.util.ByteUnit

private[spark] object Status {

val IMS_CHECKPOINT_ENABLED =
ConfigBuilder("spark.appStateStore.checkpoint.enabled")
.doc("Whether to checkpoint InMemoryStore in a live AppStatusListener, in order to " +
"accelerate the startup speed of History Server.")
.booleanConf
.createWithDefault(false)

val IMS_CHECKPOINT_BATCH_SIZE =
ConfigBuilder("spark.appStateStore.ims.checkpoint.batchSize")
.doc("The minimal batch size to trigger the checkpoint for InMemoryStore.")
.intConf
.createWithDefault(5000)

val IMS_CHECKPOINT_BUFFER_SIZE =
ConfigBuilder("spark.appStateStore.ims.checkpoint.bufferSize")
.doc("Buffer size to use when checkpoint InMemoryStore to output streams, " +
"in KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("100k")

val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable")
.booleanConf
.createWithDefault(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ private[spark] class EventLoggingListener(
}

private[spark] object EventLoggingListener extends Logging {
val CHECKPOINT = ".checkpoint"
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"
Expand Down Expand Up @@ -430,7 +431,7 @@ private[spark] object EventLoggingListener extends Logging {
def codecName(log: Path): Option[String] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logName = log.getName.stripSuffix(IN_PROGRESS)
val logName = log.getName.stripSuffix(CHECKPOINT).stripSuffix(IN_PROGRESS)
logName.split("\\.").tail.lastOption
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.util.JsonProtocol
/**
* A SparkListenerBus that can be used to replay events from serialized event data.
*/
private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
private[spark] class ReplayListenerBus(eventSkipNum: Int = 0)
extends SparkListenerBus with Logging {

/**
* Replay each event in the order maintained in the given stream. The stream is expected to
Expand Down Expand Up @@ -75,7 +76,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
try {
val lineEntries = lines
.zipWithIndex
.filter { case (line, _) => eventsFilter(line) }
.filter { case (line, index) => index >= eventSkipNum && eventsFilter(line) }

while (lineEntries.hasNext) {
try {
Expand Down
Loading