diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java index 9bc8c55bd5389..399b2dfb147f7 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java @@ -17,6 +17,7 @@ package org.apache.spark.util.kvstore; +import java.io.Serializable; import java.util.Arrays; import com.google.common.base.Preconditions; @@ -34,7 +35,7 @@ * This class is not efficient and is mostly meant to compare really small arrays, like those * generally used as indices and keys in a KVStore. */ -class ArrayWrappers { +class ArrayWrappers implements Serializable { @SuppressWarnings("unchecked") public static Comparable forArray(Object a) { @@ -53,7 +54,7 @@ public static Comparable forArray(Object a) { return (Comparable) ret; } - private static class ComparableIntArray implements Comparable { + private static class ComparableIntArray implements Comparable, Serializable { private final int[] array; @@ -92,7 +93,8 @@ public int compareTo(ComparableIntArray other) { } } - private static class ComparableLongArray implements Comparable { + private static class ComparableLongArray + implements Comparable, Serializable { private final long[] array; @@ -131,7 +133,8 @@ public int compareTo(ComparableLongArray other) { } } - private static class ComparableByteArray implements Comparable { + private static class ComparableByteArray + implements Comparable, Serializable { private final byte[] array; @@ -170,7 +173,8 @@ public int compareTo(ComparableByteArray other) { } } - private static class ComparableObjectArray implements Comparable { + private static class ComparableObjectArray + implements Comparable, Serializable { private final Object[] array; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index 6af45aec3c7b2..effcffc001387 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -17,13 +17,8 @@ package org.apache.spark.util.kvstore; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.HashSet; -import java.util.List; -import java.util.NoSuchElementException; +import java.io.Serializable; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; @@ -42,7 +37,7 @@ * according to the index. This saves memory but makes iteration more expensive. */ @Private -public class InMemoryStore implements KVStore { +public class InMemoryStore implements KVStore, Serializable { private Object metadata; private InMemoryLists inMemoryLists = new InMemoryLists(); @@ -143,7 +138,7 @@ private static KVStoreView emptyView() { * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a * class of type T to an InstanceList of type T. */ - private static class InMemoryLists { + private static class InMemoryLists implements Serializable { private final ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); @SuppressWarnings("unchecked") @@ -163,7 +158,7 @@ public void clear() { } } - private static class InstanceList { + private static class InstanceList implements Serializable { /** * A BiConsumer to control multi-entity removal. We use this in a forEach rather than an diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index b8c5fab8709ed..a2fde1379e26f 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -17,6 +17,10 @@ package org.apache.spark.util.kvstore; +import java.io.IOException; +import java.io.Serializable; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.HashMap; @@ -31,7 +35,7 @@ * Wrapper around types managed in a KVStore, providing easy access to their indexed fields. */ @Private -public class KVTypeInfo { +public class KVTypeInfo implements Serializable { private final Class type; private final Map indices; @@ -120,7 +124,7 @@ Accessor getParentAccessor(String indexName) { /** * Abstracts the difference between invoking a Field and a Method. */ - interface Accessor { + interface Accessor extends Serializable { Object get(Object instance) throws ReflectiveOperationException; @@ -129,7 +133,7 @@ interface Accessor { private class FieldAccessor implements Accessor { - private final Field field; + private Field field; FieldAccessor(Field field) { this.field = field; @@ -144,11 +148,24 @@ public Object get(Object instance) throws ReflectiveOperationException { public Class getType() { return field.getType(); } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(field.getName()); + out.writeObject(field.getDeclaringClass()); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException, NoSuchFieldException { + String name = in.readUTF(); + Class clazz = (Class) in.readObject(); + field = clazz.getDeclaredField(name); + field.setAccessible(true); + } } private class MethodAccessor implements Accessor { - private final Method method; + private Method method; MethodAccessor(Method method) { this.method = method; @@ -163,6 +180,21 @@ public Object get(Object instance) throws ReflectiveOperationException { public Class getType() { return method.getReturnType(); } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(method.getName()); + out.writeObject(method.getDeclaringClass()); + out.writeObject(method.getParameterTypes()); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException, NoSuchMethodException { + String name = in.readUTF(); + Class clazz = (Class) in.readObject(); + Class[] parameters = (Class[]) in.readObject(); + method = clazz.getDeclaredMethod(name, parameters); + method.setAccessible(true); + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5f9b18ce01279..f9ac37c056b1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{File, FileNotFoundException, IOException} +import java.io._ import java.nio.file.Files import java.util.{Date, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} @@ -50,6 +50,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} @@ -131,6 +132,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) + // a JavaSerializer used to deserialize InMemoryStoreSnapshot + val serializer = new JavaSerializer(conf).newInstance() + // Visible for testing. private[history] val listing: KVStore = storePath.map { path => val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile() @@ -441,6 +445,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && + !entry.getPath().getName().endsWith(EventLoggingListener.CHECKPOINT) && + !entry.getPath().getName().endsWith(s"${EventLoggingListener.CHECKPOINT}.tmp") && !isBlacklisted(entry.getPath) } .filter { entry => @@ -950,14 +956,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def rebuildAppStore( store: KVStore, eventLog: FileStatus, - lastUpdated: Long): Unit = { + lastUpdated: Long, + eventSkipNum: Int = 0): Unit = { // Disable async updates, since they cause higher memory usage, and it's ok to take longer // to parse the event logs in the SHS. val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false) val trackingStore = new ElementTrackingStore(store, replayConf) - val replayBus = new ReplayListenerBus() + val replayBus = new ReplayListenerBus(eventSkipNum) val listener = new AppStatusListener(trackingStore, replayConf, false, - lastUpdateTime = Some(lastUpdated)) + lastUpdateTime = Some(lastUpdated), initLiveEntitiesFromStore = eventSkipNum > 0) replayBus.addListener(listener) for { @@ -1084,10 +1091,47 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { - val store = new InMemoryStore() - val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) - rebuildAppStore(store, status, attempt.info.lastUpdated.getTime()) - store + val imsSnapshot = getOrCreateInMemoryStoreSnapshot(attempt) + val store = imsSnapshot.store + if (imsSnapshot.finished) { + store + } else { + val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) + rebuildAppStore(store, status, attempt.info.lastUpdated.getTime(), imsSnapshot.eventsNum) + store + } + } + + private def getOrCreateInMemoryStoreSnapshot(attempt: AttemptInfoWrapper) + : InMemoryStoreSnapshot = { + if (conf.get(IMS_CHECKPOINT_ENABLED)) { + val ckpPath = new Path(logDir, attempt.logPath + EventLoggingListener.CHECKPOINT) + if (fs.exists(ckpPath)) { + try { + logInfo(s"Loading InMemoryStore checkpoint file: $ckpPath") + Utils.tryWithResource(EventLoggingListener.openEventLog(ckpPath, fs)) { in => + val objIn = serializer.deserializeStream(in) + val startNs = System.nanoTime() + val imsSnapshot = objIn.readObject[InMemoryStoreSnapshot]() + objIn.close() + val finishedNs = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(finishedNs - startNs) + logInfo(s"Loaded InMemoryStore, eventsNum=${imsSnapshot.eventsNum}, " + + s"finished=${imsSnapshot.finished}, took ${duration}ms") + // +1 for skipping SparkListenerLogStart + imsSnapshot.copy(eventsNum = imsSnapshot.eventsNum + 1) + } + } catch { + case e: Exception => + logError("Failed to load InMemoryStore checkpoint file, use log file instead", e) + InMemoryStoreSnapshot(new InMemoryStore, 0, false) + } + } else { + InMemoryStoreSnapshot(new InMemoryStore, 0, false) + } + } else { + InMemoryStoreSnapshot(new InMemoryStore, 0, false) + } } private def loadPlugins(): Iterable[AppHistoryServerPlugin] = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 3e6a4e9810664..5493406e2d718 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -19,8 +19,30 @@ package org.apache.spark.internal.config import java.util.concurrent.TimeUnit +import org.apache.spark.network.util.ByteUnit + private[spark] object Status { + val IMS_CHECKPOINT_ENABLED = + ConfigBuilder("spark.appStateStore.checkpoint.enabled") + .doc("Whether to checkpoint InMemoryStore in a live AppStatusListener, in order to " + + "accelerate the startup speed of History Server.") + .booleanConf + .createWithDefault(false) + + val IMS_CHECKPOINT_BATCH_SIZE = + ConfigBuilder("spark.appStateStore.ims.checkpoint.batchSize") + .doc("The minimal batch size to trigger the checkpoint for InMemoryStore.") + .intConf + .createWithDefault(5000) + + val IMS_CHECKPOINT_BUFFER_SIZE = + ConfigBuilder("spark.appStateStore.ims.checkpoint.bufferSize") + .doc("Buffer size to use when checkpoint InMemoryStore to output streams, " + + "in KiB unless otherwise specified.") + .bytesConf(ByteUnit.KiB) + .createWithDefaultString("100k") + val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable") .booleanConf .createWithDefault(true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 48eb2da3015f8..d1c1f0df68df4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -345,6 +345,7 @@ private[spark] class EventLoggingListener( } private[spark] object EventLoggingListener extends Logging { + val CHECKPOINT = ".checkpoint" // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" @@ -430,7 +431,7 @@ private[spark] object EventLoggingListener extends Logging { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(CHECKPOINT).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 226c23733c870..0315379af0768 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -32,7 +32,8 @@ import org.apache.spark.util.JsonProtocol /** * A SparkListenerBus that can be used to replay events from serialized event data. */ -private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { +private[spark] class ReplayListenerBus(eventSkipNum: Int = 0) + extends SparkListenerBus with Logging { /** * Replay each event in the order maintained in the given stream. The stream is expected to @@ -75,7 +76,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { try { val lineEntries = lines .zipWithIndex - .filter { case (line, _) => eventsFilter(line) } + .filter { case (line, index) => index >= eventSkipNum && eventsFilter(line) } while (lineEntries.hasNext) { try { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index e340b378e01e1..08379dc2d66bc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -17,22 +17,30 @@ package org.apache.spark.status +import java.io.BufferedOutputStream import java.util.Date -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import org.apache.hadoop.fs.{FileUtil, Path} + import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.CPUS_PER_TASK +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.StageStatus import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore.InMemoryStore /** * A Spark listener that writes application information to a data store. The types written to the @@ -46,7 +54,8 @@ private[spark] class AppStatusListener( conf: SparkConf, live: Boolean, appStatusSource: Option[AppStatusSource] = None, - lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { + lastUpdateTime: Option[Long] = None, + initLiveEntitiesFromStore: Boolean = false) extends SparkListener with Logging { private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null @@ -85,6 +94,67 @@ private[spark] class AppStatusListener( /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ private var lastFlushTimeNs = System.nanoTime() + private val imsCheckpoint = createImsCheckpoint() + + private def createImsCheckpoint(): Option[InMemoryStoreCheckpoint] = { + if (live && conf.get(EVENT_LOG_ENABLED) && conf.get(IMS_CHECKPOINT_ENABLED)) { + Some(new InMemoryStoreCheckpoint( + kvstore.store.asInstanceOf[InMemoryStore], + conf, + this)) + } else { None } + } + + initLiveEntities() + + private def initLiveEntities(): Unit = { + if (!live && initLiveEntitiesFromStore) { + val imsStore = kvstore.store + imsStore.view(classOf[JobDataWrapper]) + .asScala.filter(_.info.status == JobExecutionStatus.RUNNING) + .map(_.toLiveJob).map(job => liveJobs.put(job.jobId, job)) + + imsStore.view(classOf[StageDataWrapper]).asScala.filter(_.info.status == StageStatus.ACTIVE) + .map { stageData => + val stageId = stageData.info.stageId + val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq + stageData.toLiveStage(jobs) + }.map { stage => + val stageId = stage.info.stageId + val stageAttempt = stage.info.attemptNumber() + liveStages.put((stageId, stageAttempt), stage) + + imsStore.view(classOf[ExecutorStageSummaryWrapper]).asScala + .filter { esummary => + esummary.stageId == stageId && esummary.stageAttemptId == stageAttempt } + .map(_.toLiveExecutorStageSummary) + .map { esummary => + stage.executorSummaries.put(esummary.executorId, esummary) + } + + imsStore.view(classOf[TaskDataWrapper]) + .parent(Array(stageId, stageAttempt)) + .index(TaskIndexNames.STATUS) + .first(TaskState.RUNNING.toString) + .last(TaskState.RUNNING.toString) + .closeableIterator().asScala + .map(_.toLiveTask) + .map(task => liveTasks.put(task.info.taskId, task)) + } + imsStore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).map(exec => liveExecutors.put(exec.executorId, exec)) + imsStore.view(classOf[RDDStorageInfoWrapper]).asScala + .map { rddWrapper => + val liveRdd = rddWrapper.toLiveRDD(liveExecutors) + liveRDDs.put(liveRdd.info.id, liveRdd) + } + imsStore.view(classOf[PoolData]).asScala.map { poolData => + val schedulerPool = poolData.toSchedulerPool + pools.put(schedulerPool.name, schedulerPool) + } + } + } + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) { count => cleanupExecutors(count) } @@ -104,7 +174,9 @@ private[spark] class AppStatusListener( } override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerLogStart(version) => sparkVersion = version + case SparkListenerLogStart(version) => + sparkVersion = version + case _ => } @@ -143,6 +215,8 @@ private[spark] class AppStatusListener( update(d, System.nanoTime()) } } + imsCheckpoint.foreach(_.appInfo = appInfo) + imsCheckpoint.foreach(_.eventInc()) } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { @@ -164,7 +238,9 @@ private[spark] class AppStatusListener( coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt) .getOrElse(coresPerTask) + imsCheckpoint.foreach(_.await()) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) + imsCheckpoint.foreach(_.eventInc()) } override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { @@ -187,7 +263,9 @@ private[spark] class AppStatusListener( None, None, Seq(attempt)) + imsCheckpoint.foreach(_.await()) kvstore.write(new ApplicationInfoWrapper(appInfo)) + imsCheckpoint.foreach(_.eventInc(true)) } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { @@ -201,6 +279,7 @@ private[spark] class AppStatusListener( exec.executorLogs = event.executorInfo.logUrlMap exec.attributes = event.executorInfo.attributes liveUpdate(exec, System.nanoTime()) + imsCheckpoint.foreach(_.eventInc()) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { @@ -250,6 +329,7 @@ private[spark] class AppStatusListener( deadExecutors.put(event.executorId, exec) } } + imsCheckpoint.foreach(_.eventInc()) } /** Was the specified executor active for any currently live stages? */ @@ -261,6 +341,7 @@ private[spark] class AppStatusListener( override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { updateBlackListStatus(event.executorId, true) + imsCheckpoint.foreach(_.eventInc()) } override def onExecutorBlacklistedForStage( @@ -273,6 +354,7 @@ private[spark] class AppStatusListener( liveExecutors.get(event.executorId).foreach { exec => addBlackListedStageTo(exec, event.stageId, now) } + imsCheckpoint.foreach(_.eventInc()) } override def onNodeBlacklistedForStage(event: SparkListenerNodeBlacklistedForStage): Unit = { @@ -286,6 +368,7 @@ private[spark] class AppStatusListener( liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec => addBlackListedStageTo(exec, event.stageId, now) } + imsCheckpoint.foreach(_.eventInc()) } private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = { @@ -305,14 +388,17 @@ private[spark] class AppStatusListener( override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { updateBlackListStatus(event.executorId, false) + imsCheckpoint.foreach(_.eventInc()) } override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { updateNodeBlackList(event.hostId, true) + imsCheckpoint.foreach(_.eventInc()) } override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { updateNodeBlackList(event.hostId, false) + imsCheckpoint.foreach(_.eventInc()) } private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { @@ -388,8 +474,10 @@ private[spark] class AppStatusListener( graph.outgoingEdges, graph.incomingEdges, newRDDOperationCluster(graph.rootCluster)) + imsCheckpoint.foreach(_.await()) kvstore.write(uigraph) } + imsCheckpoint.foreach(_.eventInc()) } private def newRDDOperationCluster(cluster: RDDOperationCluster): RDDOperationClusterWrapper = { @@ -459,9 +547,11 @@ private[spark] class AppStatusListener( update(job, now, last = true) if (job.status == JobExecutionStatus.SUCCEEDED) { appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) + imsCheckpoint.foreach(_.await()) kvstore.write(appSummary) } } + imsCheckpoint.foreach(_.eventInc()) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -499,6 +589,7 @@ private[spark] class AppStatusListener( } liveUpdate(stage, now) + imsCheckpoint.foreach(_.eventInc()) } override def onTaskStart(event: SparkListenerTaskStart): Unit = { @@ -535,6 +626,7 @@ private[spark] class AppStatusListener( exec.totalTasks += 1 maybeUpdate(exec, now) } + imsCheckpoint.foreach(_.eventInc()) } override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { @@ -543,6 +635,7 @@ private[spark] class AppStatusListener( liveTasks.get(event.taskInfo.taskId).foreach { task => maybeUpdate(task, System.nanoTime()) } + imsCheckpoint.foreach(_.eventInc()) } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { @@ -691,6 +784,7 @@ private[spark] class AppStatusListener( maybeUpdate(exec, now) } } + imsCheckpoint.foreach(_.eventInc()) } override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { @@ -747,12 +841,14 @@ private[spark] class AppStatusListener( } if (stage.status == v1.StageStatus.COMPLETE) { appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) + imsCheckpoint.foreach(_.await()) kvstore.write(appSummary) } } // remove any dead executors that were not running for any currently active stages deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec)) + imsCheckpoint.foreach(_.eventInc()) } private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = { @@ -772,9 +868,11 @@ private[spark] class AppStatusListener( exec.isActive = true exec.maxMemory = event.maxMem liveUpdate(exec, System.nanoTime()) + imsCheckpoint.foreach(_.eventInc()) } override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + imsCheckpoint.foreach(_.eventInc()) // Nothing to do here. Covered by onExecutorRemoved. } @@ -810,7 +908,9 @@ private[spark] class AppStatusListener( } } + imsCheckpoint.foreach(_.await()) kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + imsCheckpoint.foreach(_.eventInc()) } override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { @@ -874,10 +974,15 @@ private[spark] class AppStatusListener( case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } + imsCheckpoint.foreach { ims => + if (conf.get(EVENT_LOG_BLOCK_UPDATES)) { + ims.eventInc() + } + } } /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ - private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { + private[spark] def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => entityFlushFunc(stage) stage.executorSummaries.values.foreach(entityFlushFunc) @@ -1024,8 +1129,10 @@ private[spark] class AppStatusListener( storageLevel.deserialized, event.blockUpdatedInfo.memSize, event.blockUpdatedInfo.diskSize) + imsCheckpoint.foreach(_.await()) kvstore.write(data) } else { + imsCheckpoint.foreach(_.await()) kvstore.delete(classOf[StreamBlockData], Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId)) } @@ -1085,13 +1192,15 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { + private[spark] def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { + imsCheckpoint.foreach(_.await()) entity.write(kvstore, now, checkTriggers = last) } /** Update a live entity only if it hasn't been updated in the last configured period. */ private def maybeUpdate(entity: LiveEntity, now: Long): Unit = { - if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs && + (imsCheckpoint.isEmpty || imsCheckpoint.get.isDone)) { update(entity, now) } } @@ -1180,6 +1289,8 @@ private[spark] class AppStatusListener( } private def cleanupTasks(stage: LiveStage): Unit = { + if (imsCheckpoint.isDefined && !imsCheckpoint.get.isDone) return + val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt if (countToDelete > 0) { val stageKey = Array(stage.info.stageId, stage.info.attemptNumber) diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 38cb030297c81..83a0fd1e18d53 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -48,7 +48,7 @@ import org.apache.spark.util.kvstore._ * The configured triggers are run on a separate thread by default; they can be forced to run on * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. */ -private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { +private[spark] class ElementTrackingStore(val store: KVStore, conf: SparkConf) extends KVStore { private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { private val pending = new AtomicBoolean(false) diff --git a/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala new file mode 100644 index 0000000000000..2b19e8c0b77df --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.BufferedOutputStream +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import org.apache.hadoop.fs.{FileUtil, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Status._ +import org.apache.spark.scheduler.EventLoggingListener +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.status.api.v1 +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore.InMemoryStore + +case class InMemoryStoreSnapshot( + store: InMemoryStore, + eventsNum: Int, + finished: Boolean) extends Serializable + + +private[spark] class InMemoryStoreCheckpoint( + store: InMemoryStore, + conf: SparkConf, + listener: AppStatusListener) extends Logging { + var lastRecordEventsNum: Int = 0 + var finished: Boolean = false + + // used to count the number of processed events in a live AppStatusListener + private var processedEventsNum = 0 + private val batchSize = conf.get(IMS_CHECKPOINT_BATCH_SIZE) + private val bufferSize = conf.get(IMS_CHECKPOINT_BUFFER_SIZE).toInt + private var latch = new CountDownLatch(0) + @volatile var isDone = true + // a JavaSerializer used to serialize InMemoryStoreSnapshot + private val serializer = new JavaSerializer(conf).newInstance() + private val logBaseDir = Utils.resolveURI(conf.get(EVENT_LOG_DIR).stripSuffix("/")) + private lazy val ckpPath = getCheckpointPath + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + private val executor = ThreadUtils.newDaemonSingleThreadExecutor( + "inmemorystore-checkpoint-thread") + // should be inited before the first time checkpoint + var appInfo: v1.ApplicationInfo = _ + + private val checkpointTask = new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError(doCheckpoint()) + } + + private def getCheckpointPath: String = { + val appId = appInfo.id + val appAttemptId = appInfo.attempts.head.attemptId + EventLoggingListener.getLogPath(logBaseDir, appId, appAttemptId, None) + } + + def await(): Unit = latch.await() + + def eventInc(finish: Boolean = false): Unit = { + processedEventsNum += 1 + val shouldCheckpoint = isDone && (finish || processedEventsNum - lastRecordEventsNum >= + batchSize) + if (shouldCheckpoint) { + // flush to make sure that all processed events' related data have write into InMemoryStore + listener.flush(listener.update(_, System.nanoTime())) + latch = new CountDownLatch(1) + lastRecordEventsNum = processedEventsNum + if (finish) { + finished = true + doCheckpoint() + executor.shutdown() + } else { + executor.submit(checkpointTask) + } + } + } + + private def doCheckpoint(): Unit = { + try { + isDone = false + if (appInfo == null) { + logWarning("Haven't received event SparkListenerApplicationStart, skip checkpoint") + } else { + logInfo(s"Checkpoint InMemoryStore started, eventsNum=$processedEventsNum") + assert(appInfo != null, "appInfo is null") + val uri = new Path(ckpPath).toUri + val ckpFile = new Path(uri.getPath + + (if (!finished) EventLoggingListener.IN_PROGRESS else "") + + EventLoggingListener.CHECKPOINT) + val tmpFile = new Path(ckpFile + ".tmp") + val fileOut = fileSystem.create(tmpFile) + val bufferOut = new BufferedOutputStream(fileOut, bufferSize) + val objOut = serializer.serializeStream(bufferOut) + val startNs = System.nanoTime() + objOut.writeObject(InMemoryStoreSnapshot(store, lastRecordEventsNum, finished)) + fileOut.flush() + objOut.close() + FileUtil.copy(fileSystem, tmpFile, fileSystem, ckpFile, true, hadoopConf) + if (finished) { + val inProgressCkpFile = new Path(uri.getPath + EventLoggingListener.IN_PROGRESS + + EventLoggingListener.CHECKPOINT) + if (!fileSystem.delete(inProgressCkpFile, true)) { + logWarning(s"Failed to delete $inProgressCkpFile") + } + if (fileSystem.exists(tmpFile) && !fileSystem.delete(tmpFile, true)) { + logWarning(s"Failed to delete $tmpFile") + } + } + val finishedNs = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(finishedNs - startNs) + logInfo(s"Checkpoint InMemoryStore finished, took $duration ms") + } + } finally { + latch.countDown() + isDone = true + } + } +} diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index c5a233f14aa6d..1afeb370b7242 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -58,7 +58,7 @@ private[spark] abstract class LiveEntity { } -private class LiveJob( +private[spark] class LiveJob( val jobId: Int, name: String, val submissionTime: Option[Date], @@ -73,6 +73,8 @@ private class LiveJob( // Holds both the stage ID and the task index, packed into a single long value. val completedIndices = new OpenHashSet[Long]() + // will only be set when InMemoryStore checkpoint is enabled. + var completedIndicesNum = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -84,6 +86,8 @@ private class LiveJob( var completionTime: Option[Date] = None var completedStages: Set[Int] = Set() + // will only be set when InMemoryStore checkpoint is enabled. + var completedStagesNum = 0 var activeStages = 0 var failedStages = 0 @@ -103,9 +107,9 @@ private class LiveJob( skippedTasks, failedTasks, killedTasks, - completedIndices.size, + completedIndices.size + completedIndicesNum, activeStages, - completedStages.size, + completedStages.size + completedStagesNum, skippedStages.size, failedStages, killedSummary) @@ -114,7 +118,7 @@ private class LiveJob( } -private class LiveTask( +private[spark] class LiveTask( var info: TaskInfo, stageId: Int, stageAttemptId: Int, @@ -228,7 +232,7 @@ private class LiveTask( } -private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { +private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { var hostPort: String = null var host: String = null @@ -270,7 +274,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L // peak values for executor level metrics - val peakExecutorMetrics = new ExecutorMetrics() + var peakExecutorMetrics = new ExecutorMetrics() def hostname: String = if (host != null) host else hostPort.split(":")(0) @@ -313,10 +317,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE } } -private class LiveExecutorStageSummary( +private[spark] class LiveExecutorStageSummary( stageId: Int, attemptId: Int, - executorId: String) extends LiveEntity { + val executorId: String) extends LiveEntity { import LiveEntityHelpers._ @@ -350,7 +354,7 @@ private class LiveExecutorStageSummary( } -private class LiveStage extends LiveEntity { +private[spark] class LiveStage extends LiveEntity { import LiveEntityHelpers._ @@ -367,6 +371,8 @@ private class LiveStage extends LiveEntity { var completedTasks = 0 var failedTasks = 0 val completedIndices = new OpenHashSet[Int]() + // will only be set when InMemoryStore checkpoint is enabled. + var completedIndicesNum = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -377,9 +383,9 @@ private class LiveStage extends LiveEntity { var metrics = createMetrics(default = 0L) - val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() + var executorSummaries = new HashMap[String, LiveExecutorStageSummary]() - val activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0) + var activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0) var blackListedExecutors = new HashSet[String]() @@ -402,7 +408,7 @@ private class LiveStage extends LiveEntity { numCompleteTasks = completedTasks, numFailedTasks = failedTasks, numKilledTasks = killedTasks, - numCompletedIndices = completedIndices.size, + numCompletedIndices = completedIndices.size + completedIndicesNum, submissionTime = info.submissionTime.map(new Date(_)), firstTaskLaunchedTime = @@ -455,7 +461,7 @@ private class LiveStage extends LiveEntity { } -private class LiveRDDPartition(val blockName: String) { +private[spark] class LiveRDDPartition(val blockName: String) extends Serializable { import LiveEntityHelpers._ @@ -486,7 +492,7 @@ private class LiveRDDPartition(val blockName: String) { } -private class LiveRDDDistribution(exec: LiveExecutor) { +private[spark] class LiveRDDDistribution(exec: LiveExecutor) { import LiveEntityHelpers._ @@ -503,6 +509,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { def toApi(): v1.RDDDataDistribution = { if (lastUpdate == null) { lastUpdate = new v1.RDDDataDistribution( + executorId, weakIntern(exec.hostPort), memoryUsed, exec.maxMemory - exec.memoryUsed, @@ -517,7 +524,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { } -private class LiveRDD(val info: RDDInfo) extends LiveEntity { +private[spark] class LiveRDD(val info: RDDInfo) extends LiveEntity { import LiveEntityHelpers._ @@ -525,10 +532,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { var memoryUsed = 0L var diskUsed = 0L - private val partitions = new HashMap[String, LiveRDDPartition]() - private val partitionSeq = new RDDPartitionSeq() + private[spark] val partitions = new HashMap[String, LiveRDDPartition]() + private[spark] val partitionSeq = new RDDPartitionSeq() - private val distributions = new HashMap[String, LiveRDDDistribution]() + private[spark] val distributions = new HashMap[String, LiveRDDDistribution]() def setStorageLevel(level: String): Unit = { this.storageLevel = weakIntern(level) @@ -586,7 +593,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { } -private class SchedulerPool(name: String) extends LiveEntity { +private[spark] class SchedulerPool(val name: String) extends LiveEntity { var stageIds = Set[Int]() @@ -736,7 +743,7 @@ private object LiveEntityHelpers { * Internally, the sequence is mutable, and elements can modify the data they expose. Additions and * removals are O(1). It is not safe to do multiple writes concurrently. */ -private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { +private[spark] class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] with Serializable { @volatile private var _head: LiveRDDPartition = null @volatile private var _tail: LiveRDDPartition = null diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 7b3636fdc5b47..4bd4d4cd649cd 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.status.{LiveExecutor, LiveRDDDistribution, LiveRDDPartition} case class ApplicationInfo private[spark]( id: String, @@ -76,7 +77,7 @@ class ExecutorStageSummary private[spark]( val shuffleWriteRecords : Long, val memoryBytesSpilled : Long, val diskBytesSpilled : Long, - val isBlacklistedForStage: Boolean) + val isBlacklistedForStage: Boolean) extends Serializable class ExecutorSummary private[spark]( val id: String, @@ -107,13 +108,13 @@ class ExecutorSummary private[spark]( @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) val peakMemoryMetrics: Option[ExecutorMetrics], - val attributes: Map[String, String]) + val attributes: Map[String, String]) extends Serializable class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, - val totalOffHeapStorageMemory: Long) + val totalOffHeapStorageMemory: Long) extends Serializable /** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ private[spark] class ExecutorMetricsJsonDeserializer @@ -165,7 +166,7 @@ class JobData private[spark]( val numCompletedStages: Int, val numSkippedStages: Int, val numFailedStages: Int, - val killedTasksSummary: Map[String, Int]) + val killedTasksSummary: Map[String, Int]) extends Serializable class RDDStorageInfo private[spark]( val id: Int, @@ -176,9 +177,10 @@ class RDDStorageInfo private[spark]( val memoryUsed: Long, val diskUsed: Long, val dataDistribution: Option[Seq[RDDDataDistribution]], - val partitions: Option[Seq[RDDPartitionInfo]]) + val partitions: Option[Seq[RDDPartitionInfo]]) extends Serializable class RDDDataDistribution private[spark]( + val executorId: String, val address: String, val memoryUsed: Long, val memoryRemaining: Long, @@ -190,14 +192,35 @@ class RDDDataDistribution private[spark]( @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], @JsonDeserialize(contentAs = classOf[JLong]) - val offHeapMemoryRemaining: Option[Long]) + val offHeapMemoryRemaining: Option[Long]) extends Serializable { + + def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) + : LiveRDDDistribution = { + // TODO (wuyi) does the liveexecutor always exists ? + val exec = executors.get(executorId).get + val liveRDDDistribution = new LiveRDDDistribution(exec) + liveRDDDistribution.memoryUsed = memoryUsed + liveRDDDistribution.diskUsed = diskUsed + liveRDDDistribution.onHeapUsed = onHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.offHeapUsed = offHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.lastUpdate = this + liveRDDDistribution + } +} class RDDPartitionInfo private[spark]( val blockName: String, val storageLevel: String, val memoryUsed: Long, val diskUsed: Long, - val executors: Seq[String]) + val executors: Seq[String]) extends Serializable { + + def toLiveRDDPartition: LiveRDDPartition = { + val liveRDDPartition = new LiveRDDPartition(blockName) + liveRDDPartition.value = this + liveRDDPartition + } +} class StageData private[spark]( val status: StageStatus, @@ -250,7 +273,7 @@ class StageData private[spark]( val accumulatorUpdates: Seq[AccumulableInfo], val tasks: Option[Map[Long, TaskData]], val executorSummary: Option[Map[String, ExecutorStageSummary]], - val killedTasksSummary: Map[String, Int]) + val killedTasksSummary: Map[String, Int]) extends Serializable class TaskData private[spark]( val taskId: Long, @@ -286,15 +309,15 @@ class TaskMetrics private[spark]( val inputMetrics: InputMetrics, val outputMetrics: OutputMetrics, val shuffleReadMetrics: ShuffleReadMetrics, - val shuffleWriteMetrics: ShuffleWriteMetrics) + val shuffleWriteMetrics: ShuffleWriteMetrics) extends Serializable class InputMetrics private[spark]( val bytesRead: Long, - val recordsRead: Long) + val recordsRead: Long) extends Serializable class OutputMetrics private[spark]( val bytesWritten: Long, - val recordsWritten: Long) + val recordsWritten: Long) extends Serializable class ShuffleReadMetrics private[spark]( val remoteBlocksFetched: Long, @@ -303,12 +326,12 @@ class ShuffleReadMetrics private[spark]( val remoteBytesRead: Long, val remoteBytesReadToDisk: Long, val localBytesRead: Long, - val recordsRead: Long) + val recordsRead: Long) extends Serializable class ShuffleWriteMetrics private[spark]( val bytesWritten: Long, val writeTime: Long, - val recordsWritten: Long) + val recordsWritten: Long) extends Serializable class TaskMetricDistributions private[spark]( val quantiles: IndexedSeq[Double], @@ -368,12 +391,12 @@ class ApplicationEnvironmentInfo private[spark] ( val sparkProperties: Seq[(String, String)], val hadoopProperties: Seq[(String, String)], val systemProperties: Seq[(String, String)], - val classpathEntries: Seq[(String, String)]) + val classpathEntries: Seq[(String, String)]) extends Serializable class RuntimeInfo private[spark]( val javaVersion: String, val javaHome: String, - val scalaVersion: String) + val scalaVersion: String) extends Serializable case class StackTrace(elems: Seq[String]) { override def toString: String = elems.mkString diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9da5bea8bf5c4..d60233406350e 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -17,27 +17,35 @@ package org.apache.spark.status +import java.io.{ObjectInputStream, ObjectOutputStream} import java.lang.{Long => JLong} import java.util.Date +import scala.collection.mutable.ArrayBuffer + import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler.{StageInfo, TaskInfo, TaskLocality} import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ +import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.scope._ +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.KVIndex private[spark] case class AppStatusStoreMetadata(version: Long) -private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { +private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) extends Serializable { @JsonIgnore @KVIndex def id: String = info.id } -private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvironmentInfo) { +private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvironmentInfo) + extends Serializable { /** * There's always a single ApplicationEnvironmentInfo object per application, so this @@ -48,7 +56,37 @@ private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvi } -private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { +private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) extends Serializable { + + def toLiveExecutor: LiveExecutor = { + val liveExecutor = new LiveExecutor(info.id, info.addTime.getTime) + liveExecutor.hostPort = info.hostPort + liveExecutor.totalCores = info.totalCores + liveExecutor.rddBlocks = info.rddBlocks + liveExecutor.memoryUsed = info.memoryUsed + liveExecutor.diskUsed = info.diskUsed + liveExecutor.maxTasks = info.maxTasks + liveExecutor.maxMemory = info.maxMemory + liveExecutor.totalTasks = info.totalTasks + liveExecutor.activeTasks = info.activeTasks + liveExecutor.completedTasks = info.completedTasks + liveExecutor.failedTasks = info.failedTasks + liveExecutor.totalDuration = info.totalDuration + liveExecutor.totalGcTime = info.totalGCTime + liveExecutor.totalInputBytes = info.totalInputBytes + liveExecutor.totalShuffleRead = info.totalShuffleRead + liveExecutor.totalShuffleWrite = info.totalShuffleWrite + liveExecutor.isBlacklisted = info.isBlacklisted + liveExecutor.blacklistedInStages = info.blacklistedInStages + liveExecutor.executorLogs = info.executorLogs + liveExecutor.attributes = info.attributes + liveExecutor.totalOnHeap = info.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(-1) + liveExecutor.totalOffHeap = info.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0) + liveExecutor.usedOnHeap = info.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0) + liveExecutor.usedOffHeap = info.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0) + liveExecutor.peakExecutorMetrics = info.peakMemoryMetrics.getOrElse(new ExecutorMetrics()) + liveExecutor + } @JsonIgnore @KVIndex private def id: String = info.id @@ -69,7 +107,30 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { private[spark] class JobDataWrapper( val info: JobData, val skippedStages: Set[Int], - val sqlExecutionId: Option[Long]) { + val sqlExecutionId: Option[Long]) extends Serializable { + + def toLiveJob: LiveJob = { + val liveJob = new LiveJob( + info.jobId, + info.name, + info.submissionTime, + info.stageIds, + info.jobGroup, + info.numTasks, + sqlExecutionId) + liveJob.activeTasks = info.numActiveTasks + liveJob.completedTasks = info.numCompletedTasks + liveJob.failedTasks = info.numFailedTasks + liveJob.completedIndicesNum = info.numCompletedIndices + liveJob.killedTasks = info.numKilledTasks + liveJob.killedSummary = info.killedTasksSummary + liveJob.skippedTasks = info.numSkippedTasks + liveJob.skippedStages = skippedStages + liveJob.completedStagesNum = info.numCompletedStages + liveJob.activeStages = info.numActiveStages + liveJob.failedStages = info.numFailedStages + liveJob + } @JsonIgnore @KVIndex private def id: Int = info.jobId @@ -82,7 +143,66 @@ private[spark] class StageDataWrapper( val info: StageData, val jobIds: Set[Int], @JsonDeserialize(contentAs = classOf[JLong]) - val locality: Map[String, Long]) { + val locality: Map[String, Long]) extends Serializable { + + def toLiveStage(jobs: Seq[LiveJob]): LiveStage = { + val liveStage = new LiveStage + val firstLaunchTime = if (info.firstTaskLaunchedTime.isEmpty) { + Long.MaxValue + } else { + info.firstTaskLaunchedTime.get.getTime + } + val metrics = LiveEntityHelpers.createMetrics( + info.executorDeserializeTime, + info.executorDeserializeCpuTime, + info.executorRunTime, + info.executorCpuTime, + info.resultSize, + info.jvmGcTime, + info.resultSerializationTime, + info.memoryBytesSpilled, + info.diskBytesSpilled, + info.peakExecutionMemory, + info.inputBytes, + info.inputRecords, + info.outputBytes, + info.outputRecords, + info.shuffleRemoteBlocksFetched, + info.shuffleLocalBlocksFetched, + info.shuffleFetchWaitTime, + info.shuffleRemoteBytesRead, + info.shuffleRemoteBytesReadToDisk, + info.shuffleLocalBytesRead, + info.shuffleReadRecords, + info.shuffleWriteBytes, + info.shuffleWriteTime, + info.shuffleWriteRecords + ) + val stageInfo = new StageInfo( + info.stageId, + info.attemptId, + info.name, + info.numTasks, + Nil, + Nil, + info.details) + liveStage.jobs = jobs + liveStage.jobIds = jobs.map(_.jobId).toSet + liveStage.info = stageInfo + liveStage.status = info.status + liveStage.description = info.description + liveStage.schedulingPool = info.schedulingPool + liveStage.activeTasks = info.numActiveTasks + liveStage.completedTasks = info.numCompleteTasks + liveStage.failedTasks = info.numFailedTasks + liveStage.completedIndicesNum = info.numCompletedIndices + liveStage.killedTasks = info.numKilledTasks + liveStage.killedSummary = info.killedTasksSummary + liveStage.firstLaunchTime = firstLaunchTime + liveStage.localitySummary = locality + liveStage.metrics = metrics + liveStage + } @JsonIgnore @KVIndex private[this] val id: Array[Int] = Array(info.stageId, info.attemptId) @@ -231,7 +351,7 @@ private[spark] class TaskDataWrapper( val shuffleRecordsWritten: Long, val stageId: Int, - val stageAttemptId: Int) { + val stageAttemptId: Int) extends Serializable { def hasMetrics: Boolean = executorDeserializeTime >= 0 @@ -290,6 +410,23 @@ private[spark] class TaskDataWrapper( gettingResultTime = 0L) } + def toLiveTask: LiveTask = { + val taskInfo = + new TaskInfo( + taskId, + index, + attempt, + launchTime, + executorId, + host, + TaskLocality.withName(taskLocality), + speculative) + taskInfo.gettingResultTime = gettingResultTime + val lastUpdateTime = duration + launchTime + val liveTask = new LiveTask(taskInfo, stageId, stageAttemptId, Some(lastUpdateTime)) + liveTask + } + @JsonIgnore @KVIndex(TaskIndexNames.STAGE) private def stage: Array[Int] = Array(stageId, stageAttemptId) @@ -352,7 +489,30 @@ private[spark] class TaskDataWrapper( private def completionTime: Long = launchTime + duration } -private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { +private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) extends Serializable { + + def toLiveRDD(executors: scala.collection.Map[String, LiveExecutor]): LiveRDD = { + val rddInfo = new RDDInfo( + info.id, + info.name, + info.numPartitions, + StorageLevel.fromDescription(info.storageLevel), + false, + Nil) + val liveRDD = new LiveRDD(rddInfo) + liveRDD.memoryUsed = info.memoryUsed + liveRDD.diskUsed = info.diskUsed + info.partitions.get.foreach { rddPartition => + val liveRDDPartition = rddPartition.toLiveRDDPartition + liveRDD.partitions.put(rddPartition.blockName, liveRDDPartition) + liveRDD.partitionSeq.addPartition(liveRDDPartition) + } + info.dataDistribution.get.foreach { rddDist => + val liveRDDDist = rddDist.toLiveRDDDistribution(executors) + liveRDD.distributions.put(liveRDDDist.executorId, liveRDDDist) + } + liveRDD + } @JsonIgnore @KVIndex def id: Int = info.id @@ -366,7 +526,42 @@ private[spark] class ExecutorStageSummaryWrapper( val stageId: Int, val stageAttemptId: Int, val executorId: String, - val info: ExecutorStageSummary) { + val info: ExecutorStageSummary) extends Serializable { + + def toLiveExecutorStageSummary: LiveExecutorStageSummary = { + val liveESSummary = new LiveExecutorStageSummary(stageId, stageAttemptId, executorId) + val metrics = LiveEntityHelpers.createMetrics( + executorDeserializeTime = 0, + executorDeserializeCpuTime = 0, + executorRunTime = 0, + executorCpuTime = 0, + resultSize = 0, + jvmGcTime = 0, + resultSerializationTime = 0, + memoryBytesSpilled = info.memoryBytesSpilled, + diskBytesSpilled = info.diskBytesSpilled, + peakExecutionMemory = 0, + inputBytesRead = info.inputBytes, + inputRecordsRead = info.inputRecords, + outputBytesWritten = info.outputBytes, + outputRecordsWritten = info.outputRecords, + shuffleRemoteBlocksFetched = 0, + shuffleLocalBlocksFetched = 0, + shuffleFetchWaitTime = 0, + shuffleRemoteBytesRead = info.shuffleRead, + shuffleRemoteBytesReadToDisk = 0, + shuffleLocalBytesRead = 0, + shuffleRecordsRead = info.shuffleReadRecords, + shuffleBytesWritten = info.shuffleWrite, + shuffleWriteTime = 0, + shuffleRecordsWritten = info.shuffleWriteRecords) + liveESSummary.taskTime = info.taskTime + liveESSummary.succeededTasks = info.succeededTasks + liveESSummary.failedTasks = info.failedTasks + liveESSummary.isBlacklisted = info.isBlacklistedForStage + liveESSummary.metrics = metrics + liveESSummary + } @JsonIgnore @KVIndex private val _id: Array[Any] = Array(stageId, stageAttemptId, executorId) @@ -388,7 +583,7 @@ private[spark] class StreamBlockData( val useDisk: Boolean, val deserialized: Boolean, val memSize: Long, - val diskSize: Long) { + val diskSize: Long) extends Serializable { @JsonIgnore @KVIndex def key: Array[String] = Array(name, executorId) @@ -396,10 +591,10 @@ private[spark] class StreamBlockData( } private[spark] class RDDOperationClusterWrapper( - val id: String, - val name: String, - val childNodes: Seq[RDDOperationNode], - val childClusters: Seq[RDDOperationClusterWrapper]) { + var id: String, + var name: String, + var childNodes: Seq[RDDOperationNode], + var childClusters: Seq[RDDOperationClusterWrapper]) extends Serializable { def toRDDOperationCluster(): RDDOperationCluster = { val isBarrier = childNodes.exists(_.barrier) @@ -412,14 +607,40 @@ private[spark] class RDDOperationClusterWrapper( cluster } + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { + out.writeUTF(id) + out.writeUTF(name) + val nodeSize = childNodes.size + out.writeInt(nodeSize) + childNodes.foreach(out.writeObject) + val clusterSize = childClusters.size + out.writeInt(clusterSize) + childClusters.foreach(out.writeObject) + } + + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + id = in.readUTF() + name = in.readUTF() + val nodeSize = in.readInt() + val nodeBuffer = new ArrayBuffer[RDDOperationNode](nodeSize) + (0 until nodeSize).foreach(_ => + nodeBuffer += in.readObject().asInstanceOf[RDDOperationNode]) + childNodes = nodeBuffer + val clusterSize = in.readInt() + val clusterBuffer = new ArrayBuffer[RDDOperationClusterWrapper](clusterSize) + (0 until clusterSize).foreach(_ => + clusterBuffer += in.readObject().asInstanceOf[RDDOperationClusterWrapper]) + childClusters = clusterBuffer + } } private[spark] class RDDOperationGraphWrapper( - @KVIndexParam val stageId: Int, - val edges: Seq[RDDOperationEdge], - val outgoingEdges: Seq[RDDOperationEdge], - val incomingEdges: Seq[RDDOperationEdge], - val rootCluster: RDDOperationClusterWrapper) { + @KVIndexParam var stageId: Int, + var edges: Seq[RDDOperationEdge], + var outgoingEdges: Seq[RDDOperationEdge], + var incomingEdges: Seq[RDDOperationEdge], + var rootCluster: RDDOperationClusterWrapper) + extends Serializable { def toRDDOperationGraph(): RDDOperationGraph = { new RDDOperationGraph(edges, outgoingEdges, incomingEdges, rootCluster.toRDDOperationCluster()) @@ -429,7 +650,14 @@ private[spark] class RDDOperationGraphWrapper( private[spark] class PoolData( @KVIndexParam val name: String, - val stageIds: Set[Int]) + val stageIds: Set[Int]) extends Serializable { + + def toSchedulerPool: SchedulerPool = { + val pool = new SchedulerPool(name) + pool.stageIds = stageIds + pool + } +} /** * A class with information about an app, to be used by the UI. There's only one instance of @@ -437,7 +665,7 @@ private[spark] class PoolData( */ private[spark] class AppSummary( val numCompletedJobs: Int, - val numCompletedStages: Int) { + val numCompletedStages: Int) extends Serializable { @KVIndex def id: String = classOf[AppSummary].getName() @@ -484,7 +712,7 @@ private[spark] class CachedQuantile( val shuffleWriteBytes: Double, val shuffleWriteRecords: Double, - val shuffleWriteTime: Double) { + val shuffleWriteTime: Double) extends Serializable { @KVIndex @JsonIgnore def id: Array[Any] = Array(stageId, stageAttemptId, quantile) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4c6998d7a8e20..d9a8a3f0de3e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -163,6 +163,21 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) + + /** + * :: DeveloperApi :: + * Return the StorageLevel object with the specified description. + */ + @DeveloperApi + def fromDescription(desc: String): StorageLevel = { + val (useDisk_, useMemory_, useOffHeap_, deserialized_) = { + (desc.contains("Disk"), desc.contains("Memory"), + desc.contains("off heap"), desc.contains("Deserialized")) + } + val replication_ = desc.split(" ").takeRight(2)(0).dropRight(1).toInt + new StorageLevel(useDisk_, useMemory_, useOffHeap_, deserialized_, replication_) + } + /** * :: DeveloperApi :: * Return the StorageLevel object with the specified name. diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 9ace324322947..1e61b9cfa33df 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -47,13 +47,13 @@ private[spark] case class RDDOperationNode( name: String, cached: Boolean, barrier: Boolean, - callsite: String) + callsite: String) extends Serializable /** * A directed edge connecting two nodes in an RDDOperationGraph. * This represents an RDD dependency. */ -private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) +private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) extends Serializable /** * A cluster that groups nodes together in an RDDOperationGraph. @@ -64,7 +64,7 @@ private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) private[spark] class RDDOperationCluster( val id: String, val barrier: Boolean, - private var _name: String) { + private var _name: String) extends Serializable { private val _childNodes = new ListBuffer[RDDOperationNode] private val _childClusters = new ListBuffer[RDDOperationCluster]