From 9c0d469301f77a6e629fc80a3645a499499c67ea Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 19 Aug 2019 23:15:35 +0800 Subject: [PATCH 1/7] try --- .../spark/util/kvstore/InMemoryStore.java | 1 + .../deploy/history/FsHistoryProvider.scala | 59 ++++++-- .../apache/spark/internal/config/Status.scala | 13 ++ .../spark/scheduler/ReplayListenerBus.scala | 4 +- .../spark/status/AppStatusListener.scala | 136 +++++++++++++++++- .../spark/status/ElementTrackingStore.scala | 2 +- 6 files changed, 200 insertions(+), 15 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index 6af45aec3c7b2..5dfd101b65767 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -17,6 +17,7 @@ package org.apache.spark.util.kvstore; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5f9b18ce01279..b53a894c55dfb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{File, FileNotFoundException, IOException} +import java.io._ import java.nio.file.Files import java.util.{Date, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} @@ -30,6 +30,8 @@ import scala.io.Source import scala.util.Try import scala.xml.Node +import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy +import com.esotericsoftware.kryo.io.Input import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors @@ -38,6 +40,7 @@ import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.hadoop.security.AccessControlException import org.fusesource.leveldbjni.internal.NativeDB +import org.objenesis.strategy.{SerializingInstantiatorStrategy, StdInstantiatorStrategy} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -50,6 +53,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.serializer.KryoSerializer import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} @@ -343,7 +347,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) loadDiskStore(sm, appId, attempt) case _ => - createInMemoryStore(attempt) + createInMemoryStore(appId, attempt) } } catch { case _: FileNotFoundException => @@ -441,6 +445,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && + !entry.getPath().getName().endsWith(".ckp") && !isBlacklisted(entry.getPath) } .filter { entry => @@ -950,12 +955,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def rebuildAppStore( store: KVStore, eventLog: FileStatus, - lastUpdated: Long): Unit = { + lastUpdated: Long, + eventSkipNum: Int = 0): Unit = { // Disable async updates, since they cause higher memory usage, and it's ok to take longer // to parse the event logs in the SHS. val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false) val trackingStore = new ElementTrackingStore(store, replayConf) - val replayBus = new ReplayListenerBus() + val replayBus = new ReplayListenerBus(eventSkipNum) val listener = new AppStatusListener(trackingStore, replayConf, false, lastUpdateTime = Some(lastUpdated)) replayBus.addListener(listener) @@ -1083,11 +1089,46 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } - private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { - val store = new InMemoryStore() - val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) - rebuildAppStore(store, status, attempt.info.lastUpdated.getTime()) - store + private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = { + val imsSnapshot = getOrCreateInMemoryStoreSnapshot(appId) + val store = imsSnapshot.store + if (imsSnapshot.finished) { + store + } else { + val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) + rebuildAppStore(store, status, attempt.info.lastUpdated.getTime(), imsSnapshot.eventsNum) + store + } + } + + private def getOrCreateInMemoryStoreSnapshot(appId: String): InMemoryStoreSnapshot = { + if (conf.get(IMS_CHECKPOINT_ENABLED)) { + val path = new Path(logDir, s"ims_$appId.ckp").toUri.getPath + logInfo(s"loading $path.") + try { + val fileIn = new FileInputStream(path) +// val serializer = new KryoSerializer(conf) +// val in = serializer.newInstance().deserializeStream(fileIn) +// val imsSnapshot = in.readObject[InMemoryStoreSnapshot]() +// in.close() + val kryo = new KryoSerializer(conf).newKryo() + kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()) + val input = new Input(fileIn) + val imsSnapshot = kryo.readObject(input, classOf[InMemoryStoreSnapshot]) + input.close() + logInfo(s"Loaded InMemoryStoreSnapshot(ims, eventsSkipNum=${imsSnapshot.eventsNum}," + + s" finished=${imsSnapshot.finished}) from $path.") + imsSnapshot + } catch { + case e: Throwable => + val cause = Option(e.getCause) + logError(s"failed to load $path", cause.getOrElse(e)) + throw e + } + + } else { + InMemoryStoreSnapshot(new InMemoryStore, 0, false) + } } private def loadPlugins(): Iterable[AppHistoryServerPlugin] = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 3e6a4e9810664..684cbcea9b9fb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -21,6 +21,19 @@ import java.util.concurrent.TimeUnit private[spark] object Status { + val IMS_CHECKPOINT_ENABLED = + ConfigBuilder("spark.appStateStore.ims.checkpoint.enabled") + .doc("Whether to checkpoint InMemoryStore in a live AppStatusListener, in order to " + + "accelerate the startup speed of History Server.") + .booleanConf + .createWithDefault(true) + + val IMS_CHECKPOINT_BATCH_SIZE = + ConfigBuilder("spark.appStateStore.ims.checkpoint.batchSize") + .doc("The minimal batch size to trigger a checkpoint for InMemoryStore.") + .intConf + .createWithDefault(1000) + val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable") .booleanConf .createWithDefault(true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 226c23733c870..c5ca39cc32e55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -32,7 +32,8 @@ import org.apache.spark.util.JsonProtocol /** * A SparkListenerBus that can be used to replay events from serialized event data. */ -private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { +private[spark] class ReplayListenerBus(eventSkipNum: Int = 0) + extends SparkListenerBus with Logging { /** * Replay each event in the order maintained in the given stream. The stream is expected to @@ -75,6 +76,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { try { val lineEntries = lines .zipWithIndex + .filter { case (_, index) => index >= eventSkipNum} .filter { case (line, _) => eventsFilter(line) } while (lineEntries.hasNext) { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index e340b378e01e1..f36adfc375fd9 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -17,22 +17,112 @@ package org.apache.spark.status +import java.io.{File, FileInputStream, FileOutputStream, ObjectOutputStream} import java.util.Date -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy +import com.esotericsoftware.kryo.io.{Input, Output} +import org.objenesis.strategy.{SerializingInstantiatorStrategy, StdInstantiatorStrategy} + import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.CPUS_PER_TASK +import org.apache.spark.internal.config.{CPUS_PER_TASK, EVENT_LOG_DIR} import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ +import org.apache.spark.serializer.KryoSerializer import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore.InMemoryStore + +case class InMemoryStoreSnapshot( + store: InMemoryStore, + eventsNum: Int, + finished: Boolean) + +private[spark] class InMemoryStoreCheckpoint( + store: InMemoryStore, + conf: SparkConf) extends Logging { + var lastRecordEventsNum: Int = 0 + var finished: Boolean = false + + // used to count the number of processed events in a live AppStatusListener + private var processedEventsNum = 0 + // TODO(wuyi) config + private val batchSize = conf.get(IMS_CHECKPOINT_BATCH_SIZE) + private var latch = new CountDownLatch(0) + @volatile var isDone = true + // use event log file dir directly ? + private val dir = Utils.resolveURI(conf.get(EVENT_LOG_DIR).stripSuffix("/")).getPath + private val executor = ThreadUtils.newDaemonSingleThreadExecutor( + "inmemorystore-checkpoint-thread") + var appInfo: v1.ApplicationInfo = _ + private var cnt = 0 + + private val checkpointTask = new Runnable { + override def run(): Unit = doCheckpoint() + } + + def await(): Unit = latch.await() + + def eventInc(finish: Boolean = false): Unit = { + processedEventsNum += 1 + val shouldCheckpoint = !finished && (processedEventsNum - lastRecordEventsNum >= + batchSize || finish) + if (shouldCheckpoint) { + latch = new CountDownLatch(1) + lastRecordEventsNum = processedEventsNum + if (finish) { + finished = true + doCheckpoint() + executor.shutdown() + } else { + executor.submit(checkpointTask) + } + } + } + + private def doCheckpoint(): Unit = { + cnt += 1 + // TODO(wuyi) rewrite + logInfo(s"Do checkpoint, cnt=$cnt, processedEventsNum = $processedEventsNum") + isDone = false + assert(appInfo != null, "appInfo is nnnnnnnnnnnn") + // TODO(wuyi) multiple attempts ? appInto is null ? + val file = new File(dir, s"ims_${appInfo.id}.ckp") + val fileOut = new FileOutputStream(file) +// val serializer = new KryoSerializer(conf) +// val out = serializer.newInstance().serializeStream(fileOut) +// out.writeObject[InMemoryStoreSnapshot]( +// InMemoryStoreSnapshot(store, lastRecordEventsNum, finished)) +// out.flush() +// out.close() + logInfo(s"be stageNume: ${store.count(classOf[StageDataWrapper])}") + val kryo = new KryoSerializer(conf).newKryo() + kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()) + val output = new Output(fileOut) + kryo.writeObject(output, InMemoryStoreSnapshot(store, lastRecordEventsNum, finished)) + output.close() + + val fileIn = new FileInputStream(file) +// val kryo2 = new KryoSerializer(conf).newKryo() + val input = new Input(fileIn) + val imsSnapshot = kryo.readObject(input, classOf[InMemoryStoreSnapshot]) + input.close() + logInfo(s"af stageNume: ${imsSnapshot.store.count(classOf[StageDataWrapper])}") + logInfo(s"Loaded InMemoryStoreSnapshot(ims, eventsSkipNum=${imsSnapshot.eventsNum}," + + s" finished=${imsSnapshot.finished}) from ${file.getPath}.") + latch.countDown() + isDone = true + } +} /** * A Spark listener that writes application information to a data store. The types written to the @@ -85,6 +175,12 @@ private[spark] class AppStatusListener( /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ private var lastFlushTimeNs = System.nanoTime() + private val imsCheckpoint = if (live && conf.get(IMS_CHECKPOINT_ENABLED)) { + Some(new InMemoryStoreCheckpoint(kvstore.store.asInstanceOf[InMemoryStore], conf)) + } else { + None + } + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) { count => cleanupExecutors(count) } @@ -104,7 +200,10 @@ private[spark] class AppStatusListener( } override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerLogStart(version) => sparkVersion = version + case SparkListenerLogStart(version) => + sparkVersion = version + imsCheckpoint.foreach(_.eventInc()) + case _ => } @@ -143,6 +242,8 @@ private[spark] class AppStatusListener( update(d, System.nanoTime()) } } + imsCheckpoint.foreach(_.appInfo = appInfo) + imsCheckpoint.foreach(_.eventInc()) } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { @@ -164,7 +265,9 @@ private[spark] class AppStatusListener( coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt) .getOrElse(coresPerTask) + imsCheckpoint.foreach(_.await()) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) + imsCheckpoint.foreach(_.eventInc()) } override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { @@ -187,7 +290,9 @@ private[spark] class AppStatusListener( None, None, Seq(attempt)) + imsCheckpoint.foreach(_.await()) kvstore.write(new ApplicationInfoWrapper(appInfo)) + imsCheckpoint.foreach(_.eventInc(true)) } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { @@ -201,6 +306,7 @@ private[spark] class AppStatusListener( exec.executorLogs = event.executorInfo.logUrlMap exec.attributes = event.executorInfo.attributes liveUpdate(exec, System.nanoTime()) + imsCheckpoint.foreach(_.eventInc()) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { @@ -250,6 +356,7 @@ private[spark] class AppStatusListener( deadExecutors.put(event.executorId, exec) } } + imsCheckpoint.foreach(_.eventInc()) } /** Was the specified executor active for any currently live stages? */ @@ -261,6 +368,7 @@ private[spark] class AppStatusListener( override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { updateBlackListStatus(event.executorId, true) + imsCheckpoint.foreach(_.eventInc()) } override def onExecutorBlacklistedForStage( @@ -273,6 +381,7 @@ private[spark] class AppStatusListener( liveExecutors.get(event.executorId).foreach { exec => addBlackListedStageTo(exec, event.stageId, now) } + imsCheckpoint.foreach(_.eventInc()) } override def onNodeBlacklistedForStage(event: SparkListenerNodeBlacklistedForStage): Unit = { @@ -286,6 +395,7 @@ private[spark] class AppStatusListener( liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec => addBlackListedStageTo(exec, event.stageId, now) } + imsCheckpoint.foreach(_.eventInc()) } private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = { @@ -305,14 +415,17 @@ private[spark] class AppStatusListener( override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { updateBlackListStatus(event.executorId, false) + imsCheckpoint.foreach(_.eventInc()) } override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { updateNodeBlackList(event.hostId, true) + imsCheckpoint.foreach(_.eventInc()) } override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { updateNodeBlackList(event.hostId, false) + imsCheckpoint.foreach(_.eventInc()) } private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { @@ -390,6 +503,7 @@ private[spark] class AppStatusListener( newRDDOperationCluster(graph.rootCluster)) kvstore.write(uigraph) } + imsCheckpoint.foreach(_.eventInc()) } private def newRDDOperationCluster(cluster: RDDOperationCluster): RDDOperationClusterWrapper = { @@ -462,6 +576,7 @@ private[spark] class AppStatusListener( kvstore.write(appSummary) } } + imsCheckpoint.foreach(_.eventInc()) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -499,6 +614,7 @@ private[spark] class AppStatusListener( } liveUpdate(stage, now) + imsCheckpoint.foreach(_.eventInc()) } override def onTaskStart(event: SparkListenerTaskStart): Unit = { @@ -535,6 +651,7 @@ private[spark] class AppStatusListener( exec.totalTasks += 1 maybeUpdate(exec, now) } + imsCheckpoint.foreach(_.eventInc()) } override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { @@ -543,6 +660,7 @@ private[spark] class AppStatusListener( liveTasks.get(event.taskInfo.taskId).foreach { task => maybeUpdate(task, System.nanoTime()) } + imsCheckpoint.foreach(_.eventInc()) } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { @@ -691,6 +809,7 @@ private[spark] class AppStatusListener( maybeUpdate(exec, now) } } + imsCheckpoint.foreach(_.eventInc()) } override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { @@ -753,6 +872,7 @@ private[spark] class AppStatusListener( // remove any dead executors that were not running for any currently active stages deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec)) + imsCheckpoint.foreach(_.eventInc()) } private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = { @@ -772,9 +892,11 @@ private[spark] class AppStatusListener( exec.isActive = true exec.maxMemory = event.maxMem liveUpdate(exec, System.nanoTime()) + imsCheckpoint.foreach(_.eventInc()) } override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + imsCheckpoint.foreach(_.eventInc()) // Nothing to do here. Covered by onExecutorRemoved. } @@ -811,6 +933,7 @@ private[spark] class AppStatusListener( } kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + imsCheckpoint.foreach(_.eventInc()) } override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { @@ -851,6 +974,7 @@ private[spark] class AppStatusListener( // Re-get the current system time because `flush` may be slow and `now` is stale. lastFlushTimeNs = System.nanoTime() } + imsCheckpoint.foreach(_.eventInc()) } override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { @@ -865,6 +989,7 @@ private[spark] class AppStatusListener( update(exec, now) } } + imsCheckpoint.foreach(_.eventInc()) } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { @@ -874,6 +999,7 @@ private[spark] class AppStatusListener( case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } + imsCheckpoint.foreach(_.eventInc()) } /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ @@ -1086,12 +1212,14 @@ private[spark] class AppStatusListener( } private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { + imsCheckpoint.foreach(_.await()) entity.write(kvstore, now, checkTriggers = last) } /** Update a live entity only if it hasn't been updated in the last configured period. */ private def maybeUpdate(entity: LiveEntity, now: Long): Unit = { - if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs && + (imsCheckpoint.isEmpty || imsCheckpoint.map(_.isDone).get)) { update(entity, now) } } diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 38cb030297c81..83a0fd1e18d53 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -48,7 +48,7 @@ import org.apache.spark.util.kvstore._ * The configured triggers are run on a separate thread by default; they can be forced to run on * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. */ -private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { +private[spark] class ElementTrackingStore(val store: KVStore, conf: SparkConf) extends KVStore { private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { private val pending = new AtomicBoolean(false) From 62df9134b2645ca3bafa61da49b3da1b10e9c742 Mon Sep 17 00:00:00 2001 From: wuyi Date: Sun, 25 Aug 2019 22:56:25 +0800 Subject: [PATCH 2/7] inmemorypont checkpoint --- .../spark/util/kvstore/ArrayWrappers.java | 14 +- .../spark/util/kvstore/InMemoryStore.java | 14 +- .../apache/spark/util/kvstore/KVTypeInfo.java | 40 ++- .../deploy/history/FsHistoryProvider.scala | 65 +++-- .../apache/spark/internal/config/Status.scala | 15 +- .../scheduler/EventLoggingListener.scala | 3 +- .../spark/scheduler/ReplayListenerBus.scala | 3 +- .../spark/status/AppStatusListener.scala | 177 +++++------- .../status/InMemoryStoreCheckpoint.scala | 136 +++++++++ .../org/apache/spark/status/LiveEntity.scala | 47 +-- .../org/apache/spark/status/api/v1/api.scala | 53 +++- .../org/apache/spark/status/storeTypes.scala | 270 ++++++++++++++++-- .../apache/spark/storage/StorageLevel.scala | 15 + .../spark/ui/scope/RDDOperationGraph.scala | 6 +- 14 files changed, 642 insertions(+), 216 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java index 9bc8c55bd5389..399b2dfb147f7 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java @@ -17,6 +17,7 @@ package org.apache.spark.util.kvstore; +import java.io.Serializable; import java.util.Arrays; import com.google.common.base.Preconditions; @@ -34,7 +35,7 @@ * This class is not efficient and is mostly meant to compare really small arrays, like those * generally used as indices and keys in a KVStore. */ -class ArrayWrappers { +class ArrayWrappers implements Serializable { @SuppressWarnings("unchecked") public static Comparable forArray(Object a) { @@ -53,7 +54,7 @@ public static Comparable forArray(Object a) { return (Comparable) ret; } - private static class ComparableIntArray implements Comparable { + private static class ComparableIntArray implements Comparable, Serializable { private final int[] array; @@ -92,7 +93,8 @@ public int compareTo(ComparableIntArray other) { } } - private static class ComparableLongArray implements Comparable { + private static class ComparableLongArray + implements Comparable, Serializable { private final long[] array; @@ -131,7 +133,8 @@ public int compareTo(ComparableLongArray other) { } } - private static class ComparableByteArray implements Comparable { + private static class ComparableByteArray + implements Comparable, Serializable { private final byte[] array; @@ -170,7 +173,8 @@ public int compareTo(ComparableByteArray other) { } } - private static class ComparableObjectArray implements Comparable { + private static class ComparableObjectArray + implements Comparable, Serializable { private final Object[] array; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index 5dfd101b65767..effcffc001387 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -18,13 +18,7 @@ package org.apache.spark.util.kvstore; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.HashSet; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; @@ -43,7 +37,7 @@ * according to the index. This saves memory but makes iteration more expensive. */ @Private -public class InMemoryStore implements KVStore { +public class InMemoryStore implements KVStore, Serializable { private Object metadata; private InMemoryLists inMemoryLists = new InMemoryLists(); @@ -144,7 +138,7 @@ private static KVStoreView emptyView() { * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a * class of type T to an InstanceList of type T. */ - private static class InMemoryLists { + private static class InMemoryLists implements Serializable { private final ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); @SuppressWarnings("unchecked") @@ -164,7 +158,7 @@ public void clear() { } } - private static class InstanceList { + private static class InstanceList implements Serializable { /** * A BiConsumer to control multi-entity removal. We use this in a forEach rather than an diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index b8c5fab8709ed..a2fde1379e26f 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -17,6 +17,10 @@ package org.apache.spark.util.kvstore; +import java.io.IOException; +import java.io.Serializable; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.HashMap; @@ -31,7 +35,7 @@ * Wrapper around types managed in a KVStore, providing easy access to their indexed fields. */ @Private -public class KVTypeInfo { +public class KVTypeInfo implements Serializable { private final Class type; private final Map indices; @@ -120,7 +124,7 @@ Accessor getParentAccessor(String indexName) { /** * Abstracts the difference between invoking a Field and a Method. */ - interface Accessor { + interface Accessor extends Serializable { Object get(Object instance) throws ReflectiveOperationException; @@ -129,7 +133,7 @@ interface Accessor { private class FieldAccessor implements Accessor { - private final Field field; + private Field field; FieldAccessor(Field field) { this.field = field; @@ -144,11 +148,24 @@ public Object get(Object instance) throws ReflectiveOperationException { public Class getType() { return field.getType(); } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(field.getName()); + out.writeObject(field.getDeclaringClass()); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException, NoSuchFieldException { + String name = in.readUTF(); + Class clazz = (Class) in.readObject(); + field = clazz.getDeclaredField(name); + field.setAccessible(true); + } } private class MethodAccessor implements Accessor { - private final Method method; + private Method method; MethodAccessor(Method method) { this.method = method; @@ -163,6 +180,21 @@ public Object get(Object instance) throws ReflectiveOperationException { public Class getType() { return method.getReturnType(); } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(method.getName()); + out.writeObject(method.getDeclaringClass()); + out.writeObject(method.getParameterTypes()); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException, NoSuchMethodException { + String name = in.readUTF(); + Class clazz = (Class) in.readObject(); + Class[] parameters = (Class[]) in.readObject(); + method = clazz.getDeclaredMethod(name, parameters); + method.setAccessible(true); + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b53a894c55dfb..5ca360eb88872 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -30,8 +30,6 @@ import scala.io.Source import scala.util.Try import scala.xml.Node -import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy -import com.esotericsoftware.kryo.io.Input import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors @@ -40,7 +38,6 @@ import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.hadoop.security.AccessControlException import org.fusesource.leveldbjni.internal.NativeDB -import org.objenesis.strategy.{SerializingInstantiatorStrategy, StdInstantiatorStrategy} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -53,7 +50,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} @@ -135,6 +132,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) + // a JavaSerializer used to deserialize InMemoryStoreSnapshot + val serializer = new JavaSerializer(conf).newInstance() + // Visible for testing. private[history] val listing: KVStore = storePath.map { path => val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile() @@ -347,7 +347,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) loadDiskStore(sm, appId, attempt) case _ => - createInMemoryStore(appId, attempt) + createInMemoryStore(attempt) } } catch { case _: FileNotFoundException => @@ -446,6 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // the end-user. !entry.getPath().getName().startsWith(".") && !entry.getPath().getName().endsWith(".ckp") && + !entry.getPath().getName().endsWith(".ckp.tmp") && !isBlacklisted(entry.getPath) } .filter { entry => @@ -963,7 +964,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val trackingStore = new ElementTrackingStore(store, replayConf) val replayBus = new ReplayListenerBus(eventSkipNum) val listener = new AppStatusListener(trackingStore, replayConf, false, - lastUpdateTime = Some(lastUpdated)) + lastUpdateTime = Some(lastUpdated), initLiveEntitiesFromStore = eventSkipNum > 0) replayBus.addListener(listener) for { @@ -1089,8 +1090,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } - private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = { - val imsSnapshot = getOrCreateInMemoryStoreSnapshot(appId) + private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { + val imsSnapshot = getOrCreateInMemoryStoreSnapshot(attempt) val store = imsSnapshot.store if (imsSnapshot.finished) { store @@ -1101,31 +1102,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - private def getOrCreateInMemoryStoreSnapshot(appId: String): InMemoryStoreSnapshot = { + private def getOrCreateInMemoryStoreSnapshot(attempt: AttemptInfoWrapper) + : InMemoryStoreSnapshot = { if (conf.get(IMS_CHECKPOINT_ENABLED)) { - val path = new Path(logDir, s"ims_$appId.ckp").toUri.getPath - logInfo(s"loading $path.") - try { - val fileIn = new FileInputStream(path) -// val serializer = new KryoSerializer(conf) -// val in = serializer.newInstance().deserializeStream(fileIn) -// val imsSnapshot = in.readObject[InMemoryStoreSnapshot]() -// in.close() - val kryo = new KryoSerializer(conf).newKryo() - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()) - val input = new Input(fileIn) - val imsSnapshot = kryo.readObject(input, classOf[InMemoryStoreSnapshot]) - input.close() - logInfo(s"Loaded InMemoryStoreSnapshot(ims, eventsSkipNum=${imsSnapshot.eventsNum}," + - s" finished=${imsSnapshot.finished}) from $path.") - imsSnapshot - } catch { - case e: Throwable => - val cause = Option(e.getCause) - logError(s"failed to load $path", cause.getOrElse(e)) - throw e + val ckpPath = new Path(logDir, attempt.logPath + ".ckp") + if (fs.exists(ckpPath)) { + try { + logInfo(s"Loading InMemoryStore checkpoint file: $ckpPath") + Utils.tryWithResource(EventLoggingListener.openEventLog(ckpPath, fs)) { in => + val objIn = serializer.deserializeStream(in) + val startNs = System.nanoTime() + val imsSnapshot = objIn.readObject[InMemoryStoreSnapshot]() + objIn.close() + val finishedNs = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(finishedNs - startNs) + logInfo(s"Loaded InMemoryStore, eventsNum=${imsSnapshot.eventsNum}, " + + s"finished=${imsSnapshot.finished}, took ${duration}ms") + // +1 for skipping SparkListenerLogStart + imsSnapshot.copy(eventsNum = imsSnapshot.eventsNum + 1) + } + } catch { + case e: Exception => + logError("Failed to load InMemoryStore checkpoint file, use log file instead", e) + InMemoryStoreSnapshot(new InMemoryStore, 0, false) + } + } else { + InMemoryStoreSnapshot(new InMemoryStore, 0, false) } - } else { InMemoryStoreSnapshot(new InMemoryStore, 0, false) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 684cbcea9b9fb..62ecfd1aee0b6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -19,6 +19,8 @@ package org.apache.spark.internal.config import java.util.concurrent.TimeUnit +import org.apache.spark.network.util.ByteUnit + private[spark] object Status { val IMS_CHECKPOINT_ENABLED = @@ -26,13 +28,20 @@ private[spark] object Status { .doc("Whether to checkpoint InMemoryStore in a live AppStatusListener, in order to " + "accelerate the startup speed of History Server.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val IMS_CHECKPOINT_BATCH_SIZE = ConfigBuilder("spark.appStateStore.ims.checkpoint.batchSize") - .doc("The minimal batch size to trigger a checkpoint for InMemoryStore.") + .doc("The minimal batch size to trigger the checkpoint for InMemoryStore.") .intConf - .createWithDefault(1000) + .createWithDefault(5000) + + val IMS_CHECKPOINT_BUFFER_SIZE = + ConfigBuilder("spark.appStateStore.ims.checkpoint.bufferSize") + .doc("Buffer size to use when checkpoint InMemoryStore to output streams, " + + "in KiB unless otherwise specified.") + .bytesConf(ByteUnit.KiB) + .createWithDefaultString("100k") val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable") .booleanConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 48eb2da3015f8..c65b84f1c06d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -345,6 +345,7 @@ private[spark] class EventLoggingListener( } private[spark] object EventLoggingListener extends Logging { + val CKP = ".ckp" // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" @@ -430,7 +431,7 @@ private[spark] object EventLoggingListener extends Logging { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(CKP).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index c5ca39cc32e55..0315379af0768 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -76,8 +76,7 @@ private[spark] class ReplayListenerBus(eventSkipNum: Int = 0) try { val lineEntries = lines .zipWithIndex - .filter { case (_, index) => index >= eventSkipNum} - .filter { case (line, _) => eventsFilter(line) } + .filter { case (line, index) => index >= eventSkipNum && eventsFilter(line) } while (lineEntries.hasNext) { try { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f36adfc375fd9..caf07f72310ed 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -17,113 +17,31 @@ package org.apache.spark.status -import java.io.{File, FileInputStream, FileOutputStream, ObjectOutputStream} +import java.io.BufferedOutputStream import java.util.Date -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap -import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy -import com.esotericsoftware.kryo.io.{Input, Output} -import org.objenesis.strategy.{SerializingInstantiatorStrategy, StdInstantiatorStrategy} +import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{CPUS_PER_TASK, EVENT_LOG_DIR} +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.StageStatus import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.kvstore.InMemoryStore -case class InMemoryStoreSnapshot( - store: InMemoryStore, - eventsNum: Int, - finished: Boolean) - -private[spark] class InMemoryStoreCheckpoint( - store: InMemoryStore, - conf: SparkConf) extends Logging { - var lastRecordEventsNum: Int = 0 - var finished: Boolean = false - - // used to count the number of processed events in a live AppStatusListener - private var processedEventsNum = 0 - // TODO(wuyi) config - private val batchSize = conf.get(IMS_CHECKPOINT_BATCH_SIZE) - private var latch = new CountDownLatch(0) - @volatile var isDone = true - // use event log file dir directly ? - private val dir = Utils.resolveURI(conf.get(EVENT_LOG_DIR).stripSuffix("/")).getPath - private val executor = ThreadUtils.newDaemonSingleThreadExecutor( - "inmemorystore-checkpoint-thread") - var appInfo: v1.ApplicationInfo = _ - private var cnt = 0 - - private val checkpointTask = new Runnable { - override def run(): Unit = doCheckpoint() - } - - def await(): Unit = latch.await() - - def eventInc(finish: Boolean = false): Unit = { - processedEventsNum += 1 - val shouldCheckpoint = !finished && (processedEventsNum - lastRecordEventsNum >= - batchSize || finish) - if (shouldCheckpoint) { - latch = new CountDownLatch(1) - lastRecordEventsNum = processedEventsNum - if (finish) { - finished = true - doCheckpoint() - executor.shutdown() - } else { - executor.submit(checkpointTask) - } - } - } - - private def doCheckpoint(): Unit = { - cnt += 1 - // TODO(wuyi) rewrite - logInfo(s"Do checkpoint, cnt=$cnt, processedEventsNum = $processedEventsNum") - isDone = false - assert(appInfo != null, "appInfo is nnnnnnnnnnnn") - // TODO(wuyi) multiple attempts ? appInto is null ? - val file = new File(dir, s"ims_${appInfo.id}.ckp") - val fileOut = new FileOutputStream(file) -// val serializer = new KryoSerializer(conf) -// val out = serializer.newInstance().serializeStream(fileOut) -// out.writeObject[InMemoryStoreSnapshot]( -// InMemoryStoreSnapshot(store, lastRecordEventsNum, finished)) -// out.flush() -// out.close() - logInfo(s"be stageNume: ${store.count(classOf[StageDataWrapper])}") - val kryo = new KryoSerializer(conf).newKryo() - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()) - val output = new Output(fileOut) - kryo.writeObject(output, InMemoryStoreSnapshot(store, lastRecordEventsNum, finished)) - output.close() - - val fileIn = new FileInputStream(file) -// val kryo2 = new KryoSerializer(conf).newKryo() - val input = new Input(fileIn) - val imsSnapshot = kryo.readObject(input, classOf[InMemoryStoreSnapshot]) - input.close() - logInfo(s"af stageNume: ${imsSnapshot.store.count(classOf[StageDataWrapper])}") - logInfo(s"Loaded InMemoryStoreSnapshot(ims, eventsSkipNum=${imsSnapshot.eventsNum}," + - s" finished=${imsSnapshot.finished}) from ${file.getPath}.") - latch.countDown() - isDone = true - } -} - /** * A Spark listener that writes application information to a data store. The types written to the * store are defined in the `storeTypes.scala` file and are based on the public REST API. @@ -136,7 +54,8 @@ private[spark] class AppStatusListener( conf: SparkConf, live: Boolean, appStatusSource: Option[AppStatusSource] = None, - lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { + lastUpdateTime: Option[Long] = None, + initLiveEntitiesFromStore: Boolean = false) extends SparkListener with Logging { private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null @@ -175,10 +94,65 @@ private[spark] class AppStatusListener( /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ private var lastFlushTimeNs = System.nanoTime() - private val imsCheckpoint = if (live && conf.get(IMS_CHECKPOINT_ENABLED)) { - Some(new InMemoryStoreCheckpoint(kvstore.store.asInstanceOf[InMemoryStore], conf)) - } else { - None + private val imsCheckpoint = createImsCheckpoint() + + private def createImsCheckpoint(): Option[InMemoryStoreCheckpoint] = { + if (live && conf.get(EVENT_LOG_ENABLED) && conf.get(IMS_CHECKPOINT_ENABLED)) { + Some(new InMemoryStoreCheckpoint( + kvstore.store.asInstanceOf[InMemoryStore], + conf, + this)) + } else { None } + } + + initLiveEntities() + + private def initLiveEntities(): Unit = { + if (!live && initLiveEntitiesFromStore) { + val imsStore = kvstore.store + imsStore.view(classOf[JobDataWrapper]) + .asScala.filter(_.info.status == JobExecutionStatus.RUNNING) + .map(_.toLiveJob).map(job => liveJobs.put(job.jobId, job)) + + imsStore.view(classOf[StageDataWrapper]).asScala.filter(_.info.status == StageStatus.ACTIVE) + .map { stageData => + val stageId = stageData.info.stageId + val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq + stageData.toLiveStage(jobs) + }.map { stage => + val stageId = stage.info.stageId + val stageAttempt = stage.info.attemptNumber() + liveStages.put((stageId, stageAttempt), stage) + + imsStore.view(classOf[ExecutorStageSummaryWrapper]).asScala + .filter { esummary => + esummary.stageId == stageId && esummary.stageAttemptId == stageAttempt } + .map(_.toLiveExecutorStageSummary) + .map { esummary => + stage.executorSummaries.put(esummary.executorId, esummary) + } + + imsStore.view(classOf[TaskDataWrapper]) + .parent(Array(stageId, stageAttempt)) + .index(TaskIndexNames.STATUS) + .first(TaskState.RUNNING.toString) + .last(TaskState.RUNNING.toString) + .closeableIterator().asScala + .map(_.toLiveTask) + .map(task => liveTasks.put(task.info.taskId, task)) + } + imsStore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).map(exec => liveExecutors.put(exec.executorId, exec)) + imsStore.view(classOf[RDDStorageInfoWrapper]).asScala + .map { rddWrapper => + val liveRdd = rddWrapper.toLiveRDD(liveExecutors) + liveRDDs.put(liveRdd.info.id, liveRdd) + } + imsStore.view(classOf[PoolData]).asScala.map { poolData => + val schedulerPool = poolData.toSchedulerPool + pools.put(schedulerPool.name, schedulerPool) + } + } } kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) @@ -202,7 +176,6 @@ private[spark] class AppStatusListener( override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(version) => sparkVersion = version - imsCheckpoint.foreach(_.eventInc()) case _ => } @@ -974,7 +947,6 @@ private[spark] class AppStatusListener( // Re-get the current system time because `flush` may be slow and `now` is stale. lastFlushTimeNs = System.nanoTime() } - imsCheckpoint.foreach(_.eventInc()) } override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { @@ -989,7 +961,6 @@ private[spark] class AppStatusListener( update(exec, now) } } - imsCheckpoint.foreach(_.eventInc()) } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { @@ -999,11 +970,15 @@ private[spark] class AppStatusListener( case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } - imsCheckpoint.foreach(_.eventInc()) + imsCheckpoint.foreach { ims => + if (conf.get(EVENT_LOG_BLOCK_UPDATES)) { + ims.eventInc() + } + } } /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ - private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { + private[spark] def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => entityFlushFunc(stage) stage.executorSummaries.values.foreach(entityFlushFunc) @@ -1211,7 +1186,7 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { + private[spark] def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { imsCheckpoint.foreach(_.await()) entity.write(kvstore, now, checkTriggers = last) } @@ -1219,7 +1194,7 @@ private[spark] class AppStatusListener( /** Update a live entity only if it hasn't been updated in the last configured period. */ private def maybeUpdate(entity: LiveEntity, now: Long): Unit = { if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs && - (imsCheckpoint.isEmpty || imsCheckpoint.map(_.isDone).get)) { + (imsCheckpoint.isEmpty || imsCheckpoint.get.isDone)) { update(entity, now) } } diff --git a/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala new file mode 100644 index 0000000000000..17fa34e5aefa1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.BufferedOutputStream +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import org.apache.hadoop.fs.{FileUtil, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Status._ +import org.apache.spark.scheduler.EventLoggingListener +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.status.api.v1 +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore.InMemoryStore + +case class InMemoryStoreSnapshot( + store: InMemoryStore, + eventsNum: Int, + finished: Boolean) extends Serializable + + +private[spark] class InMemoryStoreCheckpoint( + store: InMemoryStore, + conf: SparkConf, + listener: AppStatusListener) extends Logging { + var lastRecordEventsNum: Int = 0 + var finished: Boolean = false + + // used to count the number of processed events in a live AppStatusListener + private var processedEventsNum = 0 + private val batchSize = conf.get(IMS_CHECKPOINT_BATCH_SIZE) + private val bufferSize = conf.get(IMS_CHECKPOINT_BUFFER_SIZE).toInt + private var latch = new CountDownLatch(0) + @volatile var isDone = true + // a JavaSerializer used to serialize InMemoryStoreSnapshot + private val serializer = new JavaSerializer(conf).newInstance() + private val logBaseDir = Utils.resolveURI(conf.get(EVENT_LOG_DIR).stripSuffix("/")) + private lazy val ckpPath = getCheckpointPath + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + private val executor = ThreadUtils.newDaemonSingleThreadExecutor( + "inmemorystore-checkpoint-thread") + // should be inited before the first time checkpoint + var appInfo: v1.ApplicationInfo = _ + + private val checkpointTask = new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError(doCheckpoint()) + } + + private def getCheckpointPath: String = { + val appId = appInfo.id + val appAttemptId = appInfo.attempts.head.attemptId + EventLoggingListener.getLogPath(logBaseDir, appId, appAttemptId, None) + } + + def await(): Unit = latch.await() + + def eventInc(finish: Boolean = false): Unit = { + processedEventsNum += 1 + val shouldCheckpoint = !finished && (processedEventsNum - lastRecordEventsNum >= + batchSize || finish) + if (shouldCheckpoint) { + // flush to make sure that all processed events' related data have write into InMemoryStore + listener.flush(listener.update(_, System.nanoTime())) + latch = new CountDownLatch(1) + lastRecordEventsNum = processedEventsNum + if (finish) { + finished = true + doCheckpoint() + executor.shutdown() + } else { + executor.submit(checkpointTask) + } + } + } + + private def doCheckpoint(): Unit = { + try { + isDone = false + if (appInfo == null) { + logWarning("Haven't received event SparkListenerApplicationStart, skip checkpoint") + } else { + logInfo(s"Checkpoint InMemoryStore started, eventsNum=$processedEventsNum") + assert(appInfo != null, "appInfo is null") + val uri = new Path(ckpPath).toUri + val ckpFile = new Path(uri.getPath + + (if (!finished) EventLoggingListener.IN_PROGRESS else "") + EventLoggingListener.CKP) + val tmpFile = new Path(ckpFile + ".tmp") + val fileOut = fileSystem.create(tmpFile) + val bufferOut = new BufferedOutputStream(fileOut, bufferSize) + val objOut = serializer.serializeStream(bufferOut) + val startNs = System.nanoTime() + objOut.writeObject(InMemoryStoreSnapshot(store, lastRecordEventsNum, finished)) + fileOut.flush() + objOut.close() + FileUtil.copy(fileSystem, tmpFile, fileSystem, ckpFile, true, hadoopConf) + if (finished) { + val inProgressCkpFile = new Path(uri.getPath + EventLoggingListener.IN_PROGRESS + + EventLoggingListener.CKP) + if (!fileSystem.delete(inProgressCkpFile, true)) { + logWarning(s"Failed to delete $inProgressCkpFile") + } + if (fileSystem.exists(tmpFile) && !fileSystem.delete(tmpFile, true)) { + logWarning(s"Failed to delete $tmpFile") + } + } + val finishedNs = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(finishedNs - startNs) + logInfo(s"Checkpoint InMemoryStore finished, took $duration ms") + } + } finally { + latch.countDown() + isDone = true + } + } +} diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index c5a233f14aa6d..1afeb370b7242 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -58,7 +58,7 @@ private[spark] abstract class LiveEntity { } -private class LiveJob( +private[spark] class LiveJob( val jobId: Int, name: String, val submissionTime: Option[Date], @@ -73,6 +73,8 @@ private class LiveJob( // Holds both the stage ID and the task index, packed into a single long value. val completedIndices = new OpenHashSet[Long]() + // will only be set when InMemoryStore checkpoint is enabled. + var completedIndicesNum = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -84,6 +86,8 @@ private class LiveJob( var completionTime: Option[Date] = None var completedStages: Set[Int] = Set() + // will only be set when InMemoryStore checkpoint is enabled. + var completedStagesNum = 0 var activeStages = 0 var failedStages = 0 @@ -103,9 +107,9 @@ private class LiveJob( skippedTasks, failedTasks, killedTasks, - completedIndices.size, + completedIndices.size + completedIndicesNum, activeStages, - completedStages.size, + completedStages.size + completedStagesNum, skippedStages.size, failedStages, killedSummary) @@ -114,7 +118,7 @@ private class LiveJob( } -private class LiveTask( +private[spark] class LiveTask( var info: TaskInfo, stageId: Int, stageAttemptId: Int, @@ -228,7 +232,7 @@ private class LiveTask( } -private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { +private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { var hostPort: String = null var host: String = null @@ -270,7 +274,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L // peak values for executor level metrics - val peakExecutorMetrics = new ExecutorMetrics() + var peakExecutorMetrics = new ExecutorMetrics() def hostname: String = if (host != null) host else hostPort.split(":")(0) @@ -313,10 +317,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE } } -private class LiveExecutorStageSummary( +private[spark] class LiveExecutorStageSummary( stageId: Int, attemptId: Int, - executorId: String) extends LiveEntity { + val executorId: String) extends LiveEntity { import LiveEntityHelpers._ @@ -350,7 +354,7 @@ private class LiveExecutorStageSummary( } -private class LiveStage extends LiveEntity { +private[spark] class LiveStage extends LiveEntity { import LiveEntityHelpers._ @@ -367,6 +371,8 @@ private class LiveStage extends LiveEntity { var completedTasks = 0 var failedTasks = 0 val completedIndices = new OpenHashSet[Int]() + // will only be set when InMemoryStore checkpoint is enabled. + var completedIndicesNum = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -377,9 +383,9 @@ private class LiveStage extends LiveEntity { var metrics = createMetrics(default = 0L) - val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() + var executorSummaries = new HashMap[String, LiveExecutorStageSummary]() - val activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0) + var activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0) var blackListedExecutors = new HashSet[String]() @@ -402,7 +408,7 @@ private class LiveStage extends LiveEntity { numCompleteTasks = completedTasks, numFailedTasks = failedTasks, numKilledTasks = killedTasks, - numCompletedIndices = completedIndices.size, + numCompletedIndices = completedIndices.size + completedIndicesNum, submissionTime = info.submissionTime.map(new Date(_)), firstTaskLaunchedTime = @@ -455,7 +461,7 @@ private class LiveStage extends LiveEntity { } -private class LiveRDDPartition(val blockName: String) { +private[spark] class LiveRDDPartition(val blockName: String) extends Serializable { import LiveEntityHelpers._ @@ -486,7 +492,7 @@ private class LiveRDDPartition(val blockName: String) { } -private class LiveRDDDistribution(exec: LiveExecutor) { +private[spark] class LiveRDDDistribution(exec: LiveExecutor) { import LiveEntityHelpers._ @@ -503,6 +509,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { def toApi(): v1.RDDDataDistribution = { if (lastUpdate == null) { lastUpdate = new v1.RDDDataDistribution( + executorId, weakIntern(exec.hostPort), memoryUsed, exec.maxMemory - exec.memoryUsed, @@ -517,7 +524,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { } -private class LiveRDD(val info: RDDInfo) extends LiveEntity { +private[spark] class LiveRDD(val info: RDDInfo) extends LiveEntity { import LiveEntityHelpers._ @@ -525,10 +532,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { var memoryUsed = 0L var diskUsed = 0L - private val partitions = new HashMap[String, LiveRDDPartition]() - private val partitionSeq = new RDDPartitionSeq() + private[spark] val partitions = new HashMap[String, LiveRDDPartition]() + private[spark] val partitionSeq = new RDDPartitionSeq() - private val distributions = new HashMap[String, LiveRDDDistribution]() + private[spark] val distributions = new HashMap[String, LiveRDDDistribution]() def setStorageLevel(level: String): Unit = { this.storageLevel = weakIntern(level) @@ -586,7 +593,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { } -private class SchedulerPool(name: String) extends LiveEntity { +private[spark] class SchedulerPool(val name: String) extends LiveEntity { var stageIds = Set[Int]() @@ -736,7 +743,7 @@ private object LiveEntityHelpers { * Internally, the sequence is mutable, and elements can modify the data they expose. Additions and * removals are O(1). It is not safe to do multiple writes concurrently. */ -private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { +private[spark] class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] with Serializable { @volatile private var _head: LiveRDDPartition = null @volatile private var _tail: LiveRDDPartition = null diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 7b3636fdc5b47..4bd4d4cd649cd 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.status.{LiveExecutor, LiveRDDDistribution, LiveRDDPartition} case class ApplicationInfo private[spark]( id: String, @@ -76,7 +77,7 @@ class ExecutorStageSummary private[spark]( val shuffleWriteRecords : Long, val memoryBytesSpilled : Long, val diskBytesSpilled : Long, - val isBlacklistedForStage: Boolean) + val isBlacklistedForStage: Boolean) extends Serializable class ExecutorSummary private[spark]( val id: String, @@ -107,13 +108,13 @@ class ExecutorSummary private[spark]( @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) val peakMemoryMetrics: Option[ExecutorMetrics], - val attributes: Map[String, String]) + val attributes: Map[String, String]) extends Serializable class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, - val totalOffHeapStorageMemory: Long) + val totalOffHeapStorageMemory: Long) extends Serializable /** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ private[spark] class ExecutorMetricsJsonDeserializer @@ -165,7 +166,7 @@ class JobData private[spark]( val numCompletedStages: Int, val numSkippedStages: Int, val numFailedStages: Int, - val killedTasksSummary: Map[String, Int]) + val killedTasksSummary: Map[String, Int]) extends Serializable class RDDStorageInfo private[spark]( val id: Int, @@ -176,9 +177,10 @@ class RDDStorageInfo private[spark]( val memoryUsed: Long, val diskUsed: Long, val dataDistribution: Option[Seq[RDDDataDistribution]], - val partitions: Option[Seq[RDDPartitionInfo]]) + val partitions: Option[Seq[RDDPartitionInfo]]) extends Serializable class RDDDataDistribution private[spark]( + val executorId: String, val address: String, val memoryUsed: Long, val memoryRemaining: Long, @@ -190,14 +192,35 @@ class RDDDataDistribution private[spark]( @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], @JsonDeserialize(contentAs = classOf[JLong]) - val offHeapMemoryRemaining: Option[Long]) + val offHeapMemoryRemaining: Option[Long]) extends Serializable { + + def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) + : LiveRDDDistribution = { + // TODO (wuyi) does the liveexecutor always exists ? + val exec = executors.get(executorId).get + val liveRDDDistribution = new LiveRDDDistribution(exec) + liveRDDDistribution.memoryUsed = memoryUsed + liveRDDDistribution.diskUsed = diskUsed + liveRDDDistribution.onHeapUsed = onHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.offHeapUsed = offHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.lastUpdate = this + liveRDDDistribution + } +} class RDDPartitionInfo private[spark]( val blockName: String, val storageLevel: String, val memoryUsed: Long, val diskUsed: Long, - val executors: Seq[String]) + val executors: Seq[String]) extends Serializable { + + def toLiveRDDPartition: LiveRDDPartition = { + val liveRDDPartition = new LiveRDDPartition(blockName) + liveRDDPartition.value = this + liveRDDPartition + } +} class StageData private[spark]( val status: StageStatus, @@ -250,7 +273,7 @@ class StageData private[spark]( val accumulatorUpdates: Seq[AccumulableInfo], val tasks: Option[Map[Long, TaskData]], val executorSummary: Option[Map[String, ExecutorStageSummary]], - val killedTasksSummary: Map[String, Int]) + val killedTasksSummary: Map[String, Int]) extends Serializable class TaskData private[spark]( val taskId: Long, @@ -286,15 +309,15 @@ class TaskMetrics private[spark]( val inputMetrics: InputMetrics, val outputMetrics: OutputMetrics, val shuffleReadMetrics: ShuffleReadMetrics, - val shuffleWriteMetrics: ShuffleWriteMetrics) + val shuffleWriteMetrics: ShuffleWriteMetrics) extends Serializable class InputMetrics private[spark]( val bytesRead: Long, - val recordsRead: Long) + val recordsRead: Long) extends Serializable class OutputMetrics private[spark]( val bytesWritten: Long, - val recordsWritten: Long) + val recordsWritten: Long) extends Serializable class ShuffleReadMetrics private[spark]( val remoteBlocksFetched: Long, @@ -303,12 +326,12 @@ class ShuffleReadMetrics private[spark]( val remoteBytesRead: Long, val remoteBytesReadToDisk: Long, val localBytesRead: Long, - val recordsRead: Long) + val recordsRead: Long) extends Serializable class ShuffleWriteMetrics private[spark]( val bytesWritten: Long, val writeTime: Long, - val recordsWritten: Long) + val recordsWritten: Long) extends Serializable class TaskMetricDistributions private[spark]( val quantiles: IndexedSeq[Double], @@ -368,12 +391,12 @@ class ApplicationEnvironmentInfo private[spark] ( val sparkProperties: Seq[(String, String)], val hadoopProperties: Seq[(String, String)], val systemProperties: Seq[(String, String)], - val classpathEntries: Seq[(String, String)]) + val classpathEntries: Seq[(String, String)]) extends Serializable class RuntimeInfo private[spark]( val javaVersion: String, val javaHome: String, - val scalaVersion: String) + val scalaVersion: String) extends Serializable case class StackTrace(elems: Seq[String]) { override def toString: String = elems.mkString diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9da5bea8bf5c4..d60233406350e 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -17,27 +17,35 @@ package org.apache.spark.status +import java.io.{ObjectInputStream, ObjectOutputStream} import java.lang.{Long => JLong} import java.util.Date +import scala.collection.mutable.ArrayBuffer + import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler.{StageInfo, TaskInfo, TaskLocality} import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ +import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.scope._ +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.KVIndex private[spark] case class AppStatusStoreMetadata(version: Long) -private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { +private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) extends Serializable { @JsonIgnore @KVIndex def id: String = info.id } -private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvironmentInfo) { +private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvironmentInfo) + extends Serializable { /** * There's always a single ApplicationEnvironmentInfo object per application, so this @@ -48,7 +56,37 @@ private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvi } -private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { +private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) extends Serializable { + + def toLiveExecutor: LiveExecutor = { + val liveExecutor = new LiveExecutor(info.id, info.addTime.getTime) + liveExecutor.hostPort = info.hostPort + liveExecutor.totalCores = info.totalCores + liveExecutor.rddBlocks = info.rddBlocks + liveExecutor.memoryUsed = info.memoryUsed + liveExecutor.diskUsed = info.diskUsed + liveExecutor.maxTasks = info.maxTasks + liveExecutor.maxMemory = info.maxMemory + liveExecutor.totalTasks = info.totalTasks + liveExecutor.activeTasks = info.activeTasks + liveExecutor.completedTasks = info.completedTasks + liveExecutor.failedTasks = info.failedTasks + liveExecutor.totalDuration = info.totalDuration + liveExecutor.totalGcTime = info.totalGCTime + liveExecutor.totalInputBytes = info.totalInputBytes + liveExecutor.totalShuffleRead = info.totalShuffleRead + liveExecutor.totalShuffleWrite = info.totalShuffleWrite + liveExecutor.isBlacklisted = info.isBlacklisted + liveExecutor.blacklistedInStages = info.blacklistedInStages + liveExecutor.executorLogs = info.executorLogs + liveExecutor.attributes = info.attributes + liveExecutor.totalOnHeap = info.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(-1) + liveExecutor.totalOffHeap = info.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0) + liveExecutor.usedOnHeap = info.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0) + liveExecutor.usedOffHeap = info.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0) + liveExecutor.peakExecutorMetrics = info.peakMemoryMetrics.getOrElse(new ExecutorMetrics()) + liveExecutor + } @JsonIgnore @KVIndex private def id: String = info.id @@ -69,7 +107,30 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { private[spark] class JobDataWrapper( val info: JobData, val skippedStages: Set[Int], - val sqlExecutionId: Option[Long]) { + val sqlExecutionId: Option[Long]) extends Serializable { + + def toLiveJob: LiveJob = { + val liveJob = new LiveJob( + info.jobId, + info.name, + info.submissionTime, + info.stageIds, + info.jobGroup, + info.numTasks, + sqlExecutionId) + liveJob.activeTasks = info.numActiveTasks + liveJob.completedTasks = info.numCompletedTasks + liveJob.failedTasks = info.numFailedTasks + liveJob.completedIndicesNum = info.numCompletedIndices + liveJob.killedTasks = info.numKilledTasks + liveJob.killedSummary = info.killedTasksSummary + liveJob.skippedTasks = info.numSkippedTasks + liveJob.skippedStages = skippedStages + liveJob.completedStagesNum = info.numCompletedStages + liveJob.activeStages = info.numActiveStages + liveJob.failedStages = info.numFailedStages + liveJob + } @JsonIgnore @KVIndex private def id: Int = info.jobId @@ -82,7 +143,66 @@ private[spark] class StageDataWrapper( val info: StageData, val jobIds: Set[Int], @JsonDeserialize(contentAs = classOf[JLong]) - val locality: Map[String, Long]) { + val locality: Map[String, Long]) extends Serializable { + + def toLiveStage(jobs: Seq[LiveJob]): LiveStage = { + val liveStage = new LiveStage + val firstLaunchTime = if (info.firstTaskLaunchedTime.isEmpty) { + Long.MaxValue + } else { + info.firstTaskLaunchedTime.get.getTime + } + val metrics = LiveEntityHelpers.createMetrics( + info.executorDeserializeTime, + info.executorDeserializeCpuTime, + info.executorRunTime, + info.executorCpuTime, + info.resultSize, + info.jvmGcTime, + info.resultSerializationTime, + info.memoryBytesSpilled, + info.diskBytesSpilled, + info.peakExecutionMemory, + info.inputBytes, + info.inputRecords, + info.outputBytes, + info.outputRecords, + info.shuffleRemoteBlocksFetched, + info.shuffleLocalBlocksFetched, + info.shuffleFetchWaitTime, + info.shuffleRemoteBytesRead, + info.shuffleRemoteBytesReadToDisk, + info.shuffleLocalBytesRead, + info.shuffleReadRecords, + info.shuffleWriteBytes, + info.shuffleWriteTime, + info.shuffleWriteRecords + ) + val stageInfo = new StageInfo( + info.stageId, + info.attemptId, + info.name, + info.numTasks, + Nil, + Nil, + info.details) + liveStage.jobs = jobs + liveStage.jobIds = jobs.map(_.jobId).toSet + liveStage.info = stageInfo + liveStage.status = info.status + liveStage.description = info.description + liveStage.schedulingPool = info.schedulingPool + liveStage.activeTasks = info.numActiveTasks + liveStage.completedTasks = info.numCompleteTasks + liveStage.failedTasks = info.numFailedTasks + liveStage.completedIndicesNum = info.numCompletedIndices + liveStage.killedTasks = info.numKilledTasks + liveStage.killedSummary = info.killedTasksSummary + liveStage.firstLaunchTime = firstLaunchTime + liveStage.localitySummary = locality + liveStage.metrics = metrics + liveStage + } @JsonIgnore @KVIndex private[this] val id: Array[Int] = Array(info.stageId, info.attemptId) @@ -231,7 +351,7 @@ private[spark] class TaskDataWrapper( val shuffleRecordsWritten: Long, val stageId: Int, - val stageAttemptId: Int) { + val stageAttemptId: Int) extends Serializable { def hasMetrics: Boolean = executorDeserializeTime >= 0 @@ -290,6 +410,23 @@ private[spark] class TaskDataWrapper( gettingResultTime = 0L) } + def toLiveTask: LiveTask = { + val taskInfo = + new TaskInfo( + taskId, + index, + attempt, + launchTime, + executorId, + host, + TaskLocality.withName(taskLocality), + speculative) + taskInfo.gettingResultTime = gettingResultTime + val lastUpdateTime = duration + launchTime + val liveTask = new LiveTask(taskInfo, stageId, stageAttemptId, Some(lastUpdateTime)) + liveTask + } + @JsonIgnore @KVIndex(TaskIndexNames.STAGE) private def stage: Array[Int] = Array(stageId, stageAttemptId) @@ -352,7 +489,30 @@ private[spark] class TaskDataWrapper( private def completionTime: Long = launchTime + duration } -private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { +private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) extends Serializable { + + def toLiveRDD(executors: scala.collection.Map[String, LiveExecutor]): LiveRDD = { + val rddInfo = new RDDInfo( + info.id, + info.name, + info.numPartitions, + StorageLevel.fromDescription(info.storageLevel), + false, + Nil) + val liveRDD = new LiveRDD(rddInfo) + liveRDD.memoryUsed = info.memoryUsed + liveRDD.diskUsed = info.diskUsed + info.partitions.get.foreach { rddPartition => + val liveRDDPartition = rddPartition.toLiveRDDPartition + liveRDD.partitions.put(rddPartition.blockName, liveRDDPartition) + liveRDD.partitionSeq.addPartition(liveRDDPartition) + } + info.dataDistribution.get.foreach { rddDist => + val liveRDDDist = rddDist.toLiveRDDDistribution(executors) + liveRDD.distributions.put(liveRDDDist.executorId, liveRDDDist) + } + liveRDD + } @JsonIgnore @KVIndex def id: Int = info.id @@ -366,7 +526,42 @@ private[spark] class ExecutorStageSummaryWrapper( val stageId: Int, val stageAttemptId: Int, val executorId: String, - val info: ExecutorStageSummary) { + val info: ExecutorStageSummary) extends Serializable { + + def toLiveExecutorStageSummary: LiveExecutorStageSummary = { + val liveESSummary = new LiveExecutorStageSummary(stageId, stageAttemptId, executorId) + val metrics = LiveEntityHelpers.createMetrics( + executorDeserializeTime = 0, + executorDeserializeCpuTime = 0, + executorRunTime = 0, + executorCpuTime = 0, + resultSize = 0, + jvmGcTime = 0, + resultSerializationTime = 0, + memoryBytesSpilled = info.memoryBytesSpilled, + diskBytesSpilled = info.diskBytesSpilled, + peakExecutionMemory = 0, + inputBytesRead = info.inputBytes, + inputRecordsRead = info.inputRecords, + outputBytesWritten = info.outputBytes, + outputRecordsWritten = info.outputRecords, + shuffleRemoteBlocksFetched = 0, + shuffleLocalBlocksFetched = 0, + shuffleFetchWaitTime = 0, + shuffleRemoteBytesRead = info.shuffleRead, + shuffleRemoteBytesReadToDisk = 0, + shuffleLocalBytesRead = 0, + shuffleRecordsRead = info.shuffleReadRecords, + shuffleBytesWritten = info.shuffleWrite, + shuffleWriteTime = 0, + shuffleRecordsWritten = info.shuffleWriteRecords) + liveESSummary.taskTime = info.taskTime + liveESSummary.succeededTasks = info.succeededTasks + liveESSummary.failedTasks = info.failedTasks + liveESSummary.isBlacklisted = info.isBlacklistedForStage + liveESSummary.metrics = metrics + liveESSummary + } @JsonIgnore @KVIndex private val _id: Array[Any] = Array(stageId, stageAttemptId, executorId) @@ -388,7 +583,7 @@ private[spark] class StreamBlockData( val useDisk: Boolean, val deserialized: Boolean, val memSize: Long, - val diskSize: Long) { + val diskSize: Long) extends Serializable { @JsonIgnore @KVIndex def key: Array[String] = Array(name, executorId) @@ -396,10 +591,10 @@ private[spark] class StreamBlockData( } private[spark] class RDDOperationClusterWrapper( - val id: String, - val name: String, - val childNodes: Seq[RDDOperationNode], - val childClusters: Seq[RDDOperationClusterWrapper]) { + var id: String, + var name: String, + var childNodes: Seq[RDDOperationNode], + var childClusters: Seq[RDDOperationClusterWrapper]) extends Serializable { def toRDDOperationCluster(): RDDOperationCluster = { val isBarrier = childNodes.exists(_.barrier) @@ -412,14 +607,40 @@ private[spark] class RDDOperationClusterWrapper( cluster } + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { + out.writeUTF(id) + out.writeUTF(name) + val nodeSize = childNodes.size + out.writeInt(nodeSize) + childNodes.foreach(out.writeObject) + val clusterSize = childClusters.size + out.writeInt(clusterSize) + childClusters.foreach(out.writeObject) + } + + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + id = in.readUTF() + name = in.readUTF() + val nodeSize = in.readInt() + val nodeBuffer = new ArrayBuffer[RDDOperationNode](nodeSize) + (0 until nodeSize).foreach(_ => + nodeBuffer += in.readObject().asInstanceOf[RDDOperationNode]) + childNodes = nodeBuffer + val clusterSize = in.readInt() + val clusterBuffer = new ArrayBuffer[RDDOperationClusterWrapper](clusterSize) + (0 until clusterSize).foreach(_ => + clusterBuffer += in.readObject().asInstanceOf[RDDOperationClusterWrapper]) + childClusters = clusterBuffer + } } private[spark] class RDDOperationGraphWrapper( - @KVIndexParam val stageId: Int, - val edges: Seq[RDDOperationEdge], - val outgoingEdges: Seq[RDDOperationEdge], - val incomingEdges: Seq[RDDOperationEdge], - val rootCluster: RDDOperationClusterWrapper) { + @KVIndexParam var stageId: Int, + var edges: Seq[RDDOperationEdge], + var outgoingEdges: Seq[RDDOperationEdge], + var incomingEdges: Seq[RDDOperationEdge], + var rootCluster: RDDOperationClusterWrapper) + extends Serializable { def toRDDOperationGraph(): RDDOperationGraph = { new RDDOperationGraph(edges, outgoingEdges, incomingEdges, rootCluster.toRDDOperationCluster()) @@ -429,7 +650,14 @@ private[spark] class RDDOperationGraphWrapper( private[spark] class PoolData( @KVIndexParam val name: String, - val stageIds: Set[Int]) + val stageIds: Set[Int]) extends Serializable { + + def toSchedulerPool: SchedulerPool = { + val pool = new SchedulerPool(name) + pool.stageIds = stageIds + pool + } +} /** * A class with information about an app, to be used by the UI. There's only one instance of @@ -437,7 +665,7 @@ private[spark] class PoolData( */ private[spark] class AppSummary( val numCompletedJobs: Int, - val numCompletedStages: Int) { + val numCompletedStages: Int) extends Serializable { @KVIndex def id: String = classOf[AppSummary].getName() @@ -484,7 +712,7 @@ private[spark] class CachedQuantile( val shuffleWriteBytes: Double, val shuffleWriteRecords: Double, - val shuffleWriteTime: Double) { + val shuffleWriteTime: Double) extends Serializable { @KVIndex @JsonIgnore def id: Array[Any] = Array(stageId, stageAttemptId, quantile) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4c6998d7a8e20..d9a8a3f0de3e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -163,6 +163,21 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) + + /** + * :: DeveloperApi :: + * Return the StorageLevel object with the specified description. + */ + @DeveloperApi + def fromDescription(desc: String): StorageLevel = { + val (useDisk_, useMemory_, useOffHeap_, deserialized_) = { + (desc.contains("Disk"), desc.contains("Memory"), + desc.contains("off heap"), desc.contains("Deserialized")) + } + val replication_ = desc.split(" ").takeRight(2)(0).dropRight(1).toInt + new StorageLevel(useDisk_, useMemory_, useOffHeap_, deserialized_, replication_) + } + /** * :: DeveloperApi :: * Return the StorageLevel object with the specified name. diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 9ace324322947..1e61b9cfa33df 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -47,13 +47,13 @@ private[spark] case class RDDOperationNode( name: String, cached: Boolean, barrier: Boolean, - callsite: String) + callsite: String) extends Serializable /** * A directed edge connecting two nodes in an RDDOperationGraph. * This represents an RDD dependency. */ -private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) +private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) extends Serializable /** * A cluster that groups nodes together in an RDDOperationGraph. @@ -64,7 +64,7 @@ private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) private[spark] class RDDOperationCluster( val id: String, val barrier: Boolean, - private var _name: String) { + private var _name: String) extends Serializable { private val _childNodes = new ListBuffer[RDDOperationNode] private val _childClusters = new ListBuffer[RDDOperationCluster] From bd5144951c2198ab6bda6410ef27803d407b46f1 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 27 Aug 2019 18:49:25 +0800 Subject: [PATCH 3/7] ckp -> checkpoint --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 +++--- .../org/apache/spark/scheduler/EventLoggingListener.scala | 4 ++-- .../org/apache/spark/status/InMemoryStoreCheckpoint.scala | 5 +++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5ca360eb88872..f9ac37c056b1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -445,8 +445,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - !entry.getPath().getName().endsWith(".ckp") && - !entry.getPath().getName().endsWith(".ckp.tmp") && + !entry.getPath().getName().endsWith(EventLoggingListener.CHECKPOINT) && + !entry.getPath().getName().endsWith(s"${EventLoggingListener.CHECKPOINT}.tmp") && !isBlacklisted(entry.getPath) } .filter { entry => @@ -1105,7 +1105,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def getOrCreateInMemoryStoreSnapshot(attempt: AttemptInfoWrapper) : InMemoryStoreSnapshot = { if (conf.get(IMS_CHECKPOINT_ENABLED)) { - val ckpPath = new Path(logDir, attempt.logPath + ".ckp") + val ckpPath = new Path(logDir, attempt.logPath + EventLoggingListener.CHECKPOINT) if (fs.exists(ckpPath)) { try { logInfo(s"Loading InMemoryStore checkpoint file: $ckpPath") diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c65b84f1c06d9..d1c1f0df68df4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -345,7 +345,7 @@ private[spark] class EventLoggingListener( } private[spark] object EventLoggingListener extends Logging { - val CKP = ".ckp" + val CHECKPOINT = ".checkpoint" // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" @@ -431,7 +431,7 @@ private[spark] object EventLoggingListener extends Logging { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(CKP).stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(CHECKPOINT).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } diff --git a/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala index 17fa34e5aefa1..fca7223674283 100644 --- a/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala +++ b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala @@ -104,7 +104,8 @@ private[spark] class InMemoryStoreCheckpoint( assert(appInfo != null, "appInfo is null") val uri = new Path(ckpPath).toUri val ckpFile = new Path(uri.getPath + - (if (!finished) EventLoggingListener.IN_PROGRESS else "") + EventLoggingListener.CKP) + (if (!finished) EventLoggingListener.IN_PROGRESS else "") + + EventLoggingListener.CHECKPOINT) val tmpFile = new Path(ckpFile + ".tmp") val fileOut = fileSystem.create(tmpFile) val bufferOut = new BufferedOutputStream(fileOut, bufferSize) @@ -116,7 +117,7 @@ private[spark] class InMemoryStoreCheckpoint( FileUtil.copy(fileSystem, tmpFile, fileSystem, ckpFile, true, hadoopConf) if (finished) { val inProgressCkpFile = new Path(uri.getPath + EventLoggingListener.IN_PROGRESS + - EventLoggingListener.CKP) + EventLoggingListener.CHECKPOINT) if (!fileSystem.delete(inProgressCkpFile, true)) { logWarning(s"Failed to delete $inProgressCkpFile") } From 9be2a48941b22bc5b96773512bf4162f595977c9 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 27 Aug 2019 18:51:54 +0800 Subject: [PATCH 4/7] remove ims in checkpoint config --- .../main/scala/org/apache/spark/internal/config/Status.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 62ecfd1aee0b6..5493406e2d718 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -24,7 +24,7 @@ import org.apache.spark.network.util.ByteUnit private[spark] object Status { val IMS_CHECKPOINT_ENABLED = - ConfigBuilder("spark.appStateStore.ims.checkpoint.enabled") + ConfigBuilder("spark.appStateStore.checkpoint.enabled") .doc("Whether to checkpoint InMemoryStore in a live AppStatusListener, in order to " + "accelerate the startup speed of History Server.") .booleanConf From 30c48752037d8ddc20161d8f8ecec8e904df9d1b Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 27 Aug 2019 19:33:57 +0800 Subject: [PATCH 5/7] update shouldCheckpoint condition --- .../org/apache/spark/status/InMemoryStoreCheckpoint.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala index fca7223674283..2b19e8c0b77df 100644 --- a/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala +++ b/core/src/main/scala/org/apache/spark/status/InMemoryStoreCheckpoint.scala @@ -77,8 +77,8 @@ private[spark] class InMemoryStoreCheckpoint( def eventInc(finish: Boolean = false): Unit = { processedEventsNum += 1 - val shouldCheckpoint = !finished && (processedEventsNum - lastRecordEventsNum >= - batchSize || finish) + val shouldCheckpoint = isDone && (finish || processedEventsNum - lastRecordEventsNum >= + batchSize) if (shouldCheckpoint) { // flush to make sure that all processed events' related data have write into InMemoryStore listener.flush(listener.update(_, System.nanoTime())) From 2856b852c14b1d9879059e51612c80a5ba69b57f Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 27 Aug 2019 19:52:28 +0800 Subject: [PATCH 6/7] add await() before write/delete --- .../scala/org/apache/spark/status/AppStatusListener.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index caf07f72310ed..304cd4ee81996 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -474,6 +474,7 @@ private[spark] class AppStatusListener( graph.outgoingEdges, graph.incomingEdges, newRDDOperationCluster(graph.rootCluster)) + imsCheckpoint.foreach(_.await()) kvstore.write(uigraph) } imsCheckpoint.foreach(_.eventInc()) @@ -546,6 +547,7 @@ private[spark] class AppStatusListener( update(job, now, last = true) if (job.status == JobExecutionStatus.SUCCEEDED) { appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) + imsCheckpoint.foreach(_.await()) kvstore.write(appSummary) } } @@ -839,6 +841,7 @@ private[spark] class AppStatusListener( } if (stage.status == v1.StageStatus.COMPLETE) { appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) + imsCheckpoint.foreach(_.await()) kvstore.write(appSummary) } } @@ -905,6 +908,7 @@ private[spark] class AppStatusListener( } } + imsCheckpoint.foreach(_.await()) kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) imsCheckpoint.foreach(_.eventInc()) } @@ -1125,8 +1129,10 @@ private[spark] class AppStatusListener( storageLevel.deserialized, event.blockUpdatedInfo.memSize, event.blockUpdatedInfo.diskSize) + imsCheckpoint.foreach(_.await()) kvstore.write(data) } else { + imsCheckpoint.foreach(_.await()) kvstore.delete(classOf[StreamBlockData], Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId)) } From a08c3e90c1553f81dae6fcf99e2179cfd95f93f9 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 27 Aug 2019 19:54:10 +0800 Subject: [PATCH 7/7] give up task cleaning while checkpointing --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 304cd4ee81996..08379dc2d66bc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1289,6 +1289,8 @@ private[spark] class AppStatusListener( } private def cleanupTasks(stage: LiveStage): Unit = { + if (imsCheckpoint.isDefined && !imsCheckpoint.get.isDone) return + val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt if (countToDelete > 0) { val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)