From db79fdd674d58af4e4ee3c2a34b3b58a3467904f Mon Sep 17 00:00:00 2001 From: shahid Date: Mon, 9 Dec 2019 22:47:35 +0530 Subject: [PATCH 1/2] Add incremental parsing of the eventlog --- .../deploy/history/FsHistoryProvider.scala | 122 ++++++++++++++---- .../spark/internal/config/History.scala | 4 + .../spark/scheduler/ReplayListenerBus.scala | 26 ++-- .../spark/status/AppHistoryServerPlugin.scala | 1 + .../spark/status/AppStatusListener.scala | 37 ++++++ .../org/apache/spark/status/LiveEntity.scala | 14 +- .../org/apache/spark/status/storeTypes.scala | 21 +++ .../history/FsHistoryProviderSuite.scala | 51 +++++++- .../execution/ui/SQLAppStatusListener.scala | 33 ++++- .../sql/execution/ui/SQLAppStatusStore.scala | 11 ++ .../execution/ui/SQLHistoryServerPlugin.scala | 7 + .../ui/HiveThriftServer2AppStatusStore.scala | 11 ++ ...HiveThriftServer2HistoryServerPlugin.scala | 6 + .../ui/HiveThriftServer2Listener.scala | 26 ++++ 14 files changed, 324 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a3776b3ad756d..83bae4e5916b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -129,6 +129,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) + // If incremental parsing support configuration is enabled, underlying store will not close + // during invalidate UI or detached UI. Metadata of the event read will store in the + // `IncrmentalInfo`. Whenever a new event come, parsing will happen from the line it + // read last time. Currently it supports inmemory store. TODO: Support for disk store. + private val isIncrementalParsingEnabled = storePath.isEmpty && + conf.get(History.INCREMENTAL_PARSING_ENABLED) + private val storeMap = new ConcurrentHashMap[(String, Option[String]), KVStore]() // Visible for testing. private[history] val listing: KVStore = storePath.map { path => @@ -342,7 +349,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) loadDiskStore(sm, appId, attempt) case _ => - createInMemoryStore(attempt) + createInMemoryStore(appId, attempt) } } catch { case _: FileNotFoundException => @@ -411,19 +418,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val uiOption = synchronized { activeUIs.remove((appId, attemptId)) } - uiOption.foreach { loadedUI => - loadedUI.lock.writeLock().lock() - try { - loadedUI.ui.store.close() - } finally { - loadedUI.lock.writeLock().unlock() - } + // If incremental parsing is enabled, will not close the underlying store. + if (!isIncrementalParsingEnabled) { + uiOption.foreach { loadedUI => + loadedUI.lock.writeLock().lock() + try { + loadedUI.ui.store.close() + } finally { + loadedUI.lock.writeLock().unlock() + } - diskManager.foreach { dm => - // If the UI is not valid, delete its files from disk, if any. This relies on the fact that - // ApplicationCache will never call this method concurrently with getAppUI() for the same - // appId / attemptId. - dm.release(appId, attemptId, delete = !loadedUI.valid) + diskManager.foreach { dm => + // If the UI is not valid, delete its files from disk, if any. This relies on the fact + // that ApplicationCache will never call this method concurrently with getAppUI() for + // the same appId / attemptId. + dm.release(appId, attemptId, delete = !loadedUI.valid) + } } } } @@ -615,10 +625,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } maybeUI.foreach { ui => + if (isIncrementalParsingEnabled) { + storeMap.remove(appId -> attemptId) + } ui.invalidate() ui.ui.store.close() } + if (isIncrementalParsingEnabled) { + try { + listing.delete(classOf[IncrimentalMetaInfo], Array(Some(appId), attemptId)) + } catch { + case _: NoSuchElementException => + } + } diskManager.foreach(_.release(appId, attemptId, delete = true)) true } @@ -694,7 +714,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logInfo(s"Parsing $logPath for listing data...") val logFiles = reader.listEventLogFiles - parseAppEventLogs(logFiles, bus, !appCompleted, eventsFilter) + parseAppEventLogs(logFiles, bus, !appCompleted, None, eventsFilter) // If enabled above, the listing listener will halt parsing when there's enough information to // create a listing entry. When the app is completed, or fast parsing is disabled, we still need @@ -735,7 +755,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) source.next() } - bus.replay(source, lastFile.getPath.toString, !appCompleted, eventsFilter) + bus.replay(source, lastFile.getPath.toString, !appCompleted, eventsFilter, linesToSkip = -1) } } @@ -793,7 +813,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) synchronized { activeUIs.get((appId, attemptId)).foreach { ui => ui.invalidate() - ui.ui.store.close() + // If incremental parsing is enabled, will not close the underlying store + // on invalidate UI. + if (!isIncrementalParsingEnabled) { + ui.ui.store.close() + } } } } @@ -948,7 +972,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def rebuildAppStore( store: KVStore, reader: EventLogFileReader, - lastUpdated: Long): Unit = { + lastUpdated: Long, + incrimentInfo: Option[IncrimentalMetaInfo] = None): Unit = { // Disable async updates, since they cause higher memory usage, and it's ok to take longer // to parse the event logs in the SHS. val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false) @@ -956,16 +981,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val replayBus = new ReplayListenerBus() val listener = new AppStatusListener(trackingStore, replayConf, false, lastUpdateTime = Some(lastUpdated)) + + incrimentInfo.foreach(info => listener.initialize(info.appId, info.attemptId)) replayBus.addListener(listener) for { plugin <- loadPlugins() listener <- plugin.createListeners(conf, trackingStore) - } replayBus.addListener(listener) + } { + incrimentInfo.foreach(info => plugin.initialize(listener, info.appId, info.attemptId)) + replayBus.addListener(listener) + } try { logInfo(s"Parsing ${reader.rootPath} to re-build UI...") - parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed) + parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed, incrimentInfo) trackingStore.close(false) logInfo(s"Finished parsing ${reader.rootPath}") } catch { @@ -981,14 +1011,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logFiles: Seq[FileStatus], replayBus: ReplayListenerBus, maybeTruncated: Boolean, + info: Option[IncrimentalMetaInfo] = None, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { // stop replaying next log files if ReplayListenerBus indicates some error or halt var continueReplay = true + var lineToSkip = info.map(_.lineToSkip).getOrElse(-1) + val fileToStart = info.map(_.fileIndex).getOrElse(0) + var fileIndex = 0 logFiles.foreach { file => - if (continueReplay) { + if (continueReplay && fileIndex >= fileToStart) { Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) { in => - continueReplay = replayBus.replay(in, file.getPath.toString, - maybeTruncated = maybeTruncated, eventsFilter = eventsFilter) + val result = replayBus.replay(in, file.getPath.toString, + maybeTruncated = maybeTruncated, eventsFilter = eventsFilter, lineToSkip) + continueReplay = result.success + // We need to reset the lineToSkip to -1 as we need to parse next file from the beginning + lineToSkip = -1 + if (info.isDefined && (!continueReplay || fileIndex >= logFiles.size -1)) { + val updatedInfo = info.get.copy(fileIndex = fileIndex, lineToSkip = result.linesRead) + listing.write(updatedInfo) + } + } + if (info.isDefined) { + fileIndex += 1 } } } @@ -1095,11 +1139,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } - private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { - val store = new InMemoryStore() + private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = { + val store = if (isIncrementalParsingEnabled) { + storeMap.getOrDefault(appId -> attempt.info.attemptId, new InMemoryStore()) + } else { + new InMemoryStore + } val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), attempt.lastIndex) - rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + // Incremental info is valid only if incremental parsing feature is enabled. + val info: Option[IncrimentalMetaInfo] = try { + if (isIncrementalParsingEnabled) { + Some(listing.read(classOf[IncrimentalMetaInfo], Array(Some(appId), attempt.info.attemptId))) + } else None + } catch { + case _: NoSuchElementException => + val info = IncrimentalMetaInfo(appId, attempt.info.attemptId, + fileIndex = 0, lineToSkip = -1) + listing.write(info) + Some(info) + } + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime(), info) + if (isIncrementalParsingEnabled) { + storeMap.put(appId -> attempt.info.attemptId, store) + } store } @@ -1174,6 +1237,15 @@ private[history] case class LogInfo( lastIndex: Option[Long], isComplete: Boolean) +private[history] case class IncrimentalMetaInfo( + appId: String, + attemptId: Option[String], + fileIndex: Int, + lineToSkip: Int) { + @JsonIgnore @KVIndex + private def stage: Array[Option[String]] = Array(Some(appId), attemptId) +} + private[history] class AttemptInfoWrapper( val info: ApplicationAttemptInfo, val logPath: String, diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index ca9af316dffd0..0a0bac91a973d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -107,6 +107,10 @@ private[spark] object History { .toSequence .createWithDefault(Nil) + val INCREMENTAL_PARSING_ENABLED = ConfigBuilder("spark.history.incremental.parsing.enabled") + .booleanConf + .createWithDefault(true) + val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads") .intConf .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 60b6fe7a60915..10da948e629a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -55,9 +55,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { logData: InputStream, sourceName: String, maybeTruncated: Boolean = false, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Boolean = { + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER, + linesToSkip: Int = -1): ReplayResult = { val lines = Source.fromInputStream(logData)(Codec.UTF8).getLines() - replay(lines, sourceName, maybeTruncated, eventsFilter) + replay(lines, sourceName, maybeTruncated, eventsFilter, linesToSkip) } /** @@ -68,16 +69,20 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { lines: Iterator[String], sourceName: String, maybeTruncated: Boolean, - eventsFilter: ReplayEventsFilter): Boolean = { + eventsFilter: ReplayEventsFilter, + linesToSkip: Int): ReplayResult = { var currentLine: String = null var lineNumber: Int = 0 + var lastLine = linesToSkip val unrecognizedEvents = new scala.collection.mutable.HashSet[String] val unrecognizedProperties = new scala.collection.mutable.HashSet[String] try { val lineEntries = lines .zipWithIndex - .filter { case (line, _) => eventsFilter(line) } + .filter { case (line, index) => + index > linesToSkip && eventsFilter(line) + } while (lineEntries.hasNext) { try { @@ -85,8 +90,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { currentLine = entry._1 lineNumber = entry._2 + 1 - postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) + lastLine = entry._2 } catch { case e: ClassNotFoundException => // Ignore unknown events, parse through the event log file. @@ -116,18 +121,19 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { } } } - true + ReplayResult(success = true, lastLine) } catch { case e: HaltReplayException => // Just stop replay. - false - case _: EOFException if maybeTruncated => false + ReplayResult(success = false, lastLine) + case _: EOFException if maybeTruncated => + ReplayResult(success = false, lastLine) case ioe: IOException => throw ioe case e: Exception => logError(s"Exception parsing Spark event log: $sourceName", e) logError(s"Malformed line #$lineNumber: $currentLine\n") - false + ReplayResult(success = false, lastLine) } } @@ -137,6 +143,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { } +private[spark] case class ReplayResult(success: Boolean, linesRead: Int) + /** * Exception that can be thrown by listeners to halt replay. This is handled by ReplayListenerBus * only, and will cause errors if thrown when using other bus implementations. diff --git a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala index 2e9a31d5ac69c..9497ef1220d39 100644 --- a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala +++ b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala @@ -31,6 +31,7 @@ private[spark] trait AppHistoryServerPlugin { */ def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] + def initialize(sparkListener: SparkListener, appId: String, attemptId: Option[String]): Unit = {} /** * Sets up UI of this plugin to rebuild the history UI. */ diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index df140ba7d1547..476ba1ba7dac5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -52,6 +52,8 @@ private[spark] class AppStatusListener( private var appInfo: v1.ApplicationInfo = null private var appSummary = new AppSummary(0, 0) private var coresPerTask: Int = 1 + private var appId: String = _ + private var attemptId: Option[String] = None // How often to update live entities. -1 means "never update" when replaying applications, // meaning only the last write will happen. For live applications, this avoids a few @@ -100,6 +102,41 @@ private[spark] class AppStatusListener( if (!live) { val now = System.nanoTime() flush(update(_, now)) + if (appId != null) { + // If incremental parsing is enabled, write the listener data to the store + val data = new AppStatusListenerData(appId, attemptId, liveStages, liveJobs, + liveExecutors, deadExecutors, liveTasks, liveRDDs, + pools, appInfo, coresPerTask, appSummary, activeExecutorCount) + kvstore.write(data) + } + } + } + + def initialize(appId: String, attemptId: Option[String]): Unit = { + if (!live) { + // If incremental parsing is enabled, read and update the listener data + // from store + this.appId = appId + this.attemptId = attemptId + try { + val listenerData = kvstore.read(classOf[AppStatusListenerData], + Array(Some(appId), attemptId)) + listenerData.liveStages.entrySet().asScala.foreach { entry => + liveStages.put(entry.getKey, entry.getValue) + } + listenerData.liveJobs.map{entry => liveJobs.put(entry._1, entry._2)} + listenerData.liveExecutors.map{entry => liveExecutors.put(entry._1, entry._2)} + listenerData.deadExecutors.map{entry => deadExecutors.put(entry._1, entry._2)} + listenerData.liveTasks.map{entry => liveTasks.put(entry._1, entry._2)} + listenerData.liveRDDs.map{entry => liveRDDs.put(entry._1, entry._2)} + listenerData.pools.map{entry => pools.put(entry._1, entry._2)} + appInfo = listenerData.appInfo + appSummary = listenerData.appSummary + coresPerTask = listenerData.coresPerTask + activeExecutorCount = listenerData.activeExecutorCount + } catch { + case _: NoSuchElementException => + } } } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 5ac7a56b216f2..5e204a216ffda 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -59,7 +59,7 @@ private[spark] abstract class LiveEntity { } -private class LiveJob( +private[status] class LiveJob( val jobId: Int, name: String, description: Option[String], @@ -116,7 +116,7 @@ private class LiveJob( } -private class LiveTask( +private[status] class LiveTask( var info: TaskInfo, stageId: Int, stageAttemptId: Int, @@ -244,7 +244,7 @@ private class LiveTask( } -private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { +private[status] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { var hostPort: String = null var host: String = null @@ -331,7 +331,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE } } -private class LiveExecutorStageSummary( +private[status] class LiveExecutorStageSummary( stageId: Int, attemptId: Int, executorId: String) extends LiveEntity { @@ -368,7 +368,7 @@ private class LiveExecutorStageSummary( } -private class LiveStage extends LiveEntity { +private[status] class LiveStage extends LiveEntity { import LiveEntityHelpers._ @@ -550,7 +550,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { * RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage * it started after the RDD is marked for caching. */ -private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { +private[status] class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { import LiveEntityHelpers._ @@ -615,7 +615,7 @@ private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends Liv } -private class SchedulerPool(name: String) extends LiveEntity { +private[status] class SchedulerPool(name: String) extends LiveEntity { var stageIds = Set[Int]() diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index f0a94d84d8a04..fb7903ca9c8d3 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -19,6 +19,9 @@ package org.apache.spark.status import java.lang.{Long => JLong} import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize @@ -443,6 +446,24 @@ private[spark] class PoolData( @KVIndexParam val name: String, val stageIds: Set[Int]) +private[spark] class AppStatusListenerData( + val appId: String, + val attemptId: Option[String], + val liveStages: ConcurrentHashMap[(Int, Int), LiveStage], + val liveJobs: mutable.HashMap[Int, LiveJob], + val liveExecutors: mutable.HashMap[String, LiveExecutor], + val deadExecutors: mutable.HashMap[String, LiveExecutor], + val liveTasks: mutable.HashMap[Long, LiveTask], + val liveRDDs: mutable.HashMap[Int, LiveRDD], + val pools: mutable.HashMap[String, SchedulerPool], + val appInfo: ApplicationInfo, + val coresPerTask: Int, + val appSummary: AppSummary, + val activeExecutorCount: Int) { + + @KVIndex @JsonIgnore + def key: Array[Option[String]] = Array(Some(appId), attemptId) +} /** * A class with information about an app, to be used by the UI. There's only one instance of * this summary per application, so its ID in the store is the class name. diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ed195dd44e917..53281bc4d91fb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -46,10 +46,11 @@ import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.security.GroupMappingServiceProvider -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusListenerData, AppStatusStore} import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} +import org.apache.spark.util.kvstore.InMemoryStore import org.apache.spark.util.logging.DriverLogger class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { @@ -190,6 +191,54 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { provider.mergeApplicationListingCall should be (1) } + test("support incremental parsing of the event logs") { + val provider = new FsHistoryProvider(createTestConf(true)) + + var store: InMemoryStore = null + val logFile1 = newLogFile("app1", None, inProgress = true) + writeFile(logFile1, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None), + SparkListenerJobStart(0, 2L, Seq()) + ) + updateAndCheck(provider) { list => + list.size should be (1) + provider.getAttempt("app1", None).logPath should endWith(EventLogFileWriter.IN_PROGRESS) + val appUi = provider.getAppUI("app1", None) + appUi should not be null + store = appUi.get.ui.store.store.asInstanceOf[InMemoryStore] + } + + writeFile(logFile1, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None), + SparkListenerJobStart(0, 2L, Seq()), + SparkListenerJobEnd(0, 3L, JobSucceeded) + ) + + updateAndCheck(provider) { list => + store should not be null + store.read(classOf[AppStatusListenerData], Array(Some("app1"), None)) should not be null + list.size should be (1) + provider.getAttempt("app1", None).logPath should endWith(EventLogFileWriter.IN_PROGRESS) + val appUi = provider.getAppUI("app1", None) + appUi should not be null + } + + writeFile(logFile1, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None), + SparkListenerJobStart(0, 2L, Seq()), + SparkListenerJobEnd(0, 3L, JobSucceeded), + SparkListenerApplicationEnd(4L) + ) + + logFile1.renameTo(newLogFile("app1", None, inProgress = false)) + updateAndCheck(provider) { list => + list.size should be (1) + provider.getAttempt("app1", None).logPath should not endWith(EventLogFileWriter.IN_PROGRESS) + val appUi = provider.getAppUI("app1", None) + appUi should not be null + } + } + test("history file is renamed from inprogress to completed") { val provider = new FsHistoryProvider(createTestConf()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index f207d352573de..d3dafe8c74dab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -42,11 +42,13 @@ class SQLAppStatusListener( // never flush (only do the very last write). private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + private var appId: String = _ + private var attemptId: Option[String] = None // Live tracked data is needed by the SQL status store to calculate metrics for in-flight // executions; that means arbitrary threads may be querying these maps, so they need to be // thread-safe. - private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() - private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + private[ui] val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private[ui] val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() // Returns true if this listener has no live data. Exposed for tests only. private[sql] def noLiveData(): Boolean = { @@ -66,6 +68,29 @@ class SQLAppStatusListener( // away. exec.metricsValues = aggregateMetrics(exec) exec.write(kvstore, now) + if (appId != null) { + kvstore.write(new SQLAppStatusListenerData(appId, attemptId, + liveExecutions, stageMetrics)) + } + } + } + } + + def initialize(appId: String, attemptId: Option[String]): Unit = { + if (!live) { + this.appId = appId + this.attemptId = attemptId + try { + val listenerData = kvstore.read(classOf[SQLAppStatusListenerData], + Array(Some(appId), attemptId)) + listenerData.liveExecutions.entrySet().asScala.foreach { entry => + liveExecutions.put(entry.getKey, entry.getValue) + } + listenerData.stageMetrics.entrySet().asScala.foreach { entry => + stageMetrics.put(entry.getKey, entry.getValue) + } + } catch { + case _: NoSuchElementException => } } } @@ -404,7 +429,7 @@ class SQLAppStatusListener( } -private class LiveExecutionData(val executionId: Long) extends LiveEntity { +private[ui] class LiveExecutionData(val executionId: Long) extends LiveEntity { var description: String = null var details: String = null @@ -439,7 +464,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { } -private class LiveStageMetrics( +private[ui] class LiveStageMetrics( val attemptId: Int, val numTasks: Int, val accumulatorIds: Set[Long]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index c6e7f3978469d..772ad4e91d910 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.lang.{Long => JLong} import java.util.Date +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -143,3 +144,13 @@ case class SQLPlanMetric( name: String, accumulatorId: Long, metricType: String) + +class SQLAppStatusListenerData( + val appId: String, + val attemptId: Option[String], + val liveExecutions: ConcurrentHashMap[Long, LiveExecutionData], + val stageMetrics: ConcurrentHashMap[Int, LiveStageMetrics]) { + + @KVIndex @JsonIgnore + def key: Array[Option[String]] = Array(Some(appId), attemptId) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala index 5bf1ce5eb8a90..2bbdc3afe0e71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala @@ -34,6 +34,13 @@ class SQLHistoryServerPlugin extends AppHistoryServerPlugin { } } + override def initialize( + sparkListener: SparkListener, + appId: String, + attemptId: Option[String]): Unit = { + sparkListener.asInstanceOf[SQLAppStatusListener].initialize(appId, attemptId) + } + override def displayOrder: Int = 0 } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala index 5cb78f6e64650..6c177a422db93 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.thriftserver.ui +import java.util.concurrent.ConcurrentHashMap + import com.fasterxml.jackson.annotation.JsonIgnore import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -130,3 +132,12 @@ private[thriftserver] class ExecutionInfo( } } } + +class HiveThriftserver2ListenerData( + val appId: String, + val attemptId: Option[String], + val sessionList: ConcurrentHashMap[String, LiveSessionData], + val executionList: ConcurrentHashMap[String, LiveExecutionData]) { + @KVIndex @JsonIgnore + def key: Array[Option[String]] = Array(Some(appId), attemptId) +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala index aec4125801f68..cd4bfef178e62 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala @@ -28,6 +28,12 @@ class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin { Seq(new HiveThriftServer2Listener(store, conf, None, false)) } + override def initialize( + sparkListener: SparkListener, + appId: String, + attemptId: Option[String]): Unit = { + sparkListener.asInstanceOf[HiveThriftServer2Listener].initialize(appId, attemptId) + } override def setupUI(ui: SparkUI): Unit = { val store = new HiveThriftServer2AppStatusStore(ui.store.store) if (store.getSessionCount > 0) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6d0a506fa94dc..aef8795c50e20 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -43,6 +43,9 @@ private[thriftserver] class HiveThriftServer2Listener( private val sessionList = new ConcurrentHashMap[String, LiveSessionData]() private val executionList = new ConcurrentHashMap[String, LiveExecutionData]() + private var appId: String = _ + private var attemptId: Option[String] = None + private val (retainedStatements: Int, retainedSessions: Int) = { (sparkConf.get(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT), sparkConf.get(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)) @@ -69,6 +72,29 @@ private[thriftserver] class HiveThriftServer2Listener( kvstore.onFlush { if (!live) { flush((entity: LiveEntity) => updateStoreWithTriggerEnabled(entity)) + if (appId != null) { + kvstore.write(new HiveThriftserver2ListenerData( + appId, attemptId, sessionList, executionList)) + } + } + } + + def initialize(appId: String, attemptId: Option[String]): Unit = { + if (!live) { + this.appId = appId + this.attemptId = attemptId + try { + val listenerData = kvstore.read(classOf[HiveThriftserver2ListenerData], + Array(Some(appId), attemptId)) + listenerData.executionList.entrySet().asScala.foreach { entry => + executionList.put(entry.getKey, entry.getValue) + } + listenerData.sessionList.entrySet().asScala.foreach { entry => + sessionList.put(entry.getKey, entry.getValue) + } + } catch { + case _: NoSuchElementException => + } } } From b6b2c5a3cc46d25595bfa7e187846187dd5ac806 Mon Sep 17 00:00:00 2001 From: shahid Date: Tue, 10 Dec 2019 03:20:54 +0530 Subject: [PATCH 2/2] nit space --- .../org/apache/spark/status/AppHistoryServerPlugin.scala | 4 ++++ .../ui/HiveThriftServer2HistoryServerPlugin.scala | 1 + 2 files changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala index 9497ef1220d39..dd4092375e2ac 100644 --- a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala +++ b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala @@ -31,7 +31,11 @@ private[spark] trait AppHistoryServerPlugin { */ def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] + /** + * Initialize the listener parameters for incremental parsing of the event logs. + */ def initialize(sparkListener: SparkListener, appId: String, attemptId: Option[String]): Unit = {} + /** * Sets up UI of this plugin to rebuild the history UI. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala index cd4bfef178e62..00ad6d835ac54 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala @@ -34,6 +34,7 @@ class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin { attemptId: Option[String]): Unit = { sparkListener.asInstanceOf[HiveThriftServer2Listener].initialize(appId, attemptId) } + override def setupUI(ui: SparkUI): Unit = { val store = new HiveThriftServer2AppStatusStore(ui.store.store) if (store.getSessionCount > 0) {