Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
// If incremental parsing support configuration is enabled, underlying store will not close
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. nit: incremental parsing support configuration seems odd. IMHO, just incremental parsing would work.
  2. there're two kinds of stores in FsHistoryProvider, so maybe better to clarify here; underlying APP kvstore or some better words?

// during invalidate UI or detached UI. Metadata of the event read will store in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: invalidating / detaching
nit2: Metadata of reading event log (maybe there should be better words...) will be stored

// `IncrmentalInfo`. Whenever a new event come, parsing will happen from the line it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess incremental parsing already explains this sentence, looks redundant. Moreover, technically, parsing doesn't start immediately when a new event comes.

// read last time. Currently it supports inmemory store. TODO: Support for disk store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please place TODO as separate line of comment so that it helps IDE to highlight.

private val isIncrementalParsingEnabled = storePath.isEmpty &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to not add is/are in the name of value even it's expected to return Boolean. Nearest example is fastInProgressParsing.

conf.get(History.INCREMENTAL_PARSING_ENABLED)
private val storeMap = new ConcurrentHashMap[(String, Option[String]), KVStore]()

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
Expand Down Expand Up @@ -342,7 +349,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
loadDiskStore(sm, appId, attempt)

case _ =>
createInMemoryStore(attempt)
createInMemoryStore(appId, attempt)
}
} catch {
case _: FileNotFoundException =>
Expand Down Expand Up @@ -411,19 +418,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val uiOption = synchronized {
activeUIs.remove((appId, attemptId))
}
uiOption.foreach { loadedUI =>
loadedUI.lock.writeLock().lock()
try {
loadedUI.ui.store.close()
} finally {
loadedUI.lock.writeLock().unlock()
}
// If incremental parsing is enabled, will not close the underlying store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks redundant as we explained it already in above. Either one would be sufficient.

if (!isIncrementalParsingEnabled) {
uiOption.foreach { loadedUI =>
loadedUI.lock.writeLock().lock()
try {
loadedUI.ui.store.close()
} finally {
loadedUI.lock.writeLock().unlock()
}

diskManager.foreach { dm =>
// If the UI is not valid, delete its files from disk, if any. This relies on the fact that
// ApplicationCache will never call this method concurrently with getAppUI() for the same
// appId / attemptId.
dm.release(appId, attemptId, delete = !loadedUI.valid)
diskManager.foreach { dm =>
// If the UI is not valid, delete its files from disk, if any. This relies on the fact
// that ApplicationCache will never call this method concurrently with getAppUI() for
// the same appId / attemptId.
dm.release(appId, attemptId, delete = !loadedUI.valid)
}
}
}
}
Expand Down Expand Up @@ -615,10 +625,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

maybeUI.foreach { ui =>
if (isIncrementalParsingEnabled) {
storeMap.remove(appId -> attemptId)
}
ui.invalidate()
ui.ui.store.close()
}

if (isIncrementalParsingEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to maintain metadata of reading in kvstore, given we lose everything upon SHS restart. You may want to store it in another HashMap, or even simply just store it within storeMap. (You may want to have another class for data structure of storeMap entity.)

try {
listing.delete(classOf[IncrimentalMetaInfo], Array(Some(appId), attemptId))
} catch {
case _: NoSuchElementException =>
}
}
diskManager.foreach(_.release(appId, attemptId, delete = true))
true
}
Expand Down Expand Up @@ -694,7 +714,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

logInfo(s"Parsing $logPath for listing data...")
val logFiles = reader.listEventLogFiles
parseAppEventLogs(logFiles, bus, !appCompleted, eventsFilter)
parseAppEventLogs(logFiles, bus, !appCompleted, None, eventsFilter)

// If enabled above, the listing listener will halt parsing when there's enough information to
// create a listing entry. When the app is completed, or fast parsing is disabled, we still need
Expand Down Expand Up @@ -735,7 +755,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
source.next()
}

bus.replay(source, lastFile.getPath.toString, !appCompleted, eventsFilter)
bus.replay(source, lastFile.getPath.toString, !appCompleted, eventsFilter, linesToSkip = -1)
}
}

Expand Down Expand Up @@ -793,7 +813,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
synchronized {
activeUIs.get((appId, attemptId)).foreach { ui =>
ui.invalidate()
ui.ui.store.close()
// If incremental parsing is enabled, will not close the underlying store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: looks redundant as we explained it already in above. Either one would be sufficient.

// on invalidate UI.
if (!isIncrementalParsingEnabled) {
ui.ui.store.close()
}
}
}
}
Expand Down Expand Up @@ -948,24 +972,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def rebuildAppStore(
store: KVStore,
reader: EventLogFileReader,
lastUpdated: Long): Unit = {
lastUpdated: Long,
incrimentInfo: Option[IncrimentalMetaInfo] = None): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: incimentInfo -> incrementInfo.

// Disable async updates, since they cause higher memory usage, and it's ok to take longer
// to parse the event logs in the SHS.
val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
val trackingStore = new ElementTrackingStore(store, replayConf)
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(trackingStore, replayConf, false,
lastUpdateTime = Some(lastUpdated))

incrimentInfo.foreach(info => listener.initialize(info.appId, info.attemptId))
replayBus.addListener(listener)

for {
plugin <- loadPlugins()
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)
} {
incrimentInfo.foreach(info => plugin.initialize(listener, info.appId, info.attemptId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd add a new abstraction (like RestorableSparkListener or SnapshottableSparkListener... better words are welcome) which has a initialize method so that we can call initialize against listener instead of plugin. But given more changes are required, let's hear more voices on committers.

replayBus.addListener(listener)
}

try {
logInfo(s"Parsing ${reader.rootPath} to re-build UI...")
parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed)
parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed, incrimentInfo)
trackingStore.close(false)
logInfo(s"Finished parsing ${reader.rootPath}")
} catch {
Expand All @@ -981,14 +1011,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logFiles: Seq[FileStatus],
replayBus: ReplayListenerBus,
maybeTruncated: Boolean,
info: Option[IncrimentalMetaInfo] = None,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
// stop replaying next log files if ReplayListenerBus indicates some error or halt
var continueReplay = true
var lineToSkip = info.map(_.lineToSkip).getOrElse(-1)
val fileToStart = info.map(_.fileIndex).getOrElse(0)
var fileIndex = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logFiles.drop(fileToStart).foreach would be enough to filter out already read files.

logFiles.foreach { file =>
if (continueReplay) {
if (continueReplay && fileIndex >= fileToStart) {
Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) { in =>
continueReplay = replayBus.replay(in, file.getPath.toString,
maybeTruncated = maybeTruncated, eventsFilter = eventsFilter)
val result = replayBus.replay(in, file.getPath.toString,
maybeTruncated = maybeTruncated, eventsFilter = eventsFilter, lineToSkip)
continueReplay = result.success
// We need to reset the lineToSkip to -1 as we need to parse next file from the beginning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure here is a good place to update the IncrementalMetaInfo:

  1. The method doesn't know whether replaying events will be reflected in app's KV store. I know info will be None for the case, but still not easy to understand unless knowing full picture.

  2. The logic seems to be broken when there's error being thrown. e.g. If this reads 3 files successfully and throws error on reading 4th file, store may reflect some events in 4th file (though I think it should be discarded), where it doesn't update IncrementalMetaInfo to reflect the last status of store.

So it may be better to propagate the information to the caller and let caller deal with it.

And as a side note this will not work with compaction (#26416) for various reasons:

  1. Listing log files will not start with index 0 if there's any compact file.
  2. Log file for the index which should restart from may be removed (either compacted into higher index of compact file, or equal index of compact file). In either way, it should be replayed from the start (though compact log file would be small enough to replay quickly).

But let's put aside of that - if #26416 becomes merged earlier it should be reconsidered.

lineToSkip = -1
if (info.isDefined && (!continueReplay || fileIndex >= logFiles.size -1)) {
val updatedInfo = info.get.copy(fileIndex = fileIndex, lineToSkip = result.linesRead)
listing.write(updatedInfo)
}
}
if (info.isDefined) {
fileIndex += 1
}
}
}
Expand Down Expand Up @@ -1095,11 +1139,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
KVUtils.open(newStorePath, metadata)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
val store = new InMemoryStore()
private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = {
val store = if (isIncrementalParsingEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is hard to read.

  1. reader can be initialized in first line
  2. it would be just better to have a big if-else as I don't think there're enough of sharing code between twos.

storeMap.getOrDefault(appId -> attempt.info.attemptId, new InMemoryStore())
} else {
new InMemoryStore
}
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
// Incremental info is valid only if incremental parsing feature is enabled.
val info: Option[IncrimentalMetaInfo] = try {
if (isIncrementalParsingEnabled) {
Some(listing.read(classOf[IncrimentalMetaInfo], Array(Some(appId), attempt.info.attemptId)))
} else None
} catch {
case _: NoSuchElementException =>
val info = IncrimentalMetaInfo(appId, attempt.info.attemptId,
fileIndex = 0, lineToSkip = -1)
listing.write(info)
Some(info)
}
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime(), info)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to discard store when there's any exception thrown from rebuildAppStore(), and then IncrementalMetaInfo also becomes invalid. They should be discarded from both storeMap and listing.

if (isIncrementalParsingEnabled) {
storeMap.put(appId -> attempt.info.attemptId, store)
}
store
}

Expand Down Expand Up @@ -1174,6 +1237,15 @@ private[history] case class LogInfo(
lastIndex: Option[Long],
isComplete: Boolean)

private[history] case class IncrimentalMetaInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IncrementalMetaInfo.

But as I commented earlier, I don't see a reason why this is needed. This will be required when we support incremental parsing upon SHS restart.

appId: String,
attemptId: Option[String],
fileIndex: Int,
lineToSkip: Int) {
@JsonIgnore @KVIndex
private def stage: Array[Option[String]] = Array(Some(appId), attemptId)
}

private[history] class AttemptInfoWrapper(
val info: ApplicationAttemptInfo,
val logPath: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ private[spark] object History {
.toSequence
.createWithDefault(Nil)

val INCREMENTAL_PARSING_ENABLED = ConfigBuilder("spark.history.incremental.parsing.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.history.fs.enableIncrementalParsing (or maybe someone can propose better naming)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you may want to explain the feature at least in configuration.md.

.booleanConf
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd turn it off by default for couple of bugfix (or even minor) versions.


val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
.intConf
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
logData: InputStream,
sourceName: String,
maybeTruncated: Boolean = false,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Boolean = {
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER,
linesToSkip: Int = -1): ReplayResult = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the scaladoc accordingly.

val lines = Source.fromInputStream(logData)(Codec.UTF8).getLines()
replay(lines, sourceName, maybeTruncated, eventsFilter)
replay(lines, sourceName, maybeTruncated, eventsFilter, linesToSkip)
}

/**
Expand All @@ -68,25 +69,29 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
lines: Iterator[String],
sourceName: String,
maybeTruncated: Boolean,
eventsFilter: ReplayEventsFilter): Boolean = {
eventsFilter: ReplayEventsFilter,
linesToSkip: Int): ReplayResult = {
var currentLine: String = null
var lineNumber: Int = 0
var lastLine = linesToSkip
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]

try {
val lineEntries = lines
.zipWithIndex
.filter { case (line, _) => eventsFilter(line) }
.filter { case (line, index) =>
index > linesToSkip && eventsFilter(line)
}

while (lineEntries.hasNext) {
try {
val entry = lineEntries.next()

currentLine = entry._1
lineNumber = entry._2 + 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I feel it's better to have empty line between twos.

postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
lastLine = entry._2
} catch {
case e: ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
Expand Down Expand Up @@ -116,18 +121,19 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
}
}
}
true
ReplayResult(success = true, lastLine)
} catch {
case e: HaltReplayException =>
// Just stop replay.
false
case _: EOFException if maybeTruncated => false
ReplayResult(success = false, lastLine)
case _: EOFException if maybeTruncated =>
ReplayResult(success = false, lastLine)
case ioe: IOException =>
throw ioe
case e: Exception =>
logError(s"Exception parsing Spark event log: $sourceName", e)
logError(s"Malformed line #$lineNumber: $currentLine\n")
false
ReplayResult(success = false, lastLine)
}
}

Expand All @@ -137,6 +143,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {

}

private[spark] case class ReplayResult(success: Boolean, linesRead: Int)

/**
* Exception that can be thrown by listeners to halt replay. This is handled by ReplayListenerBus
* only, and will cause errors if thrown when using other bus implementations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ private[spark] trait AppHistoryServerPlugin {
*/
def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener]

/**
* Initialize the listener parameters for incremental parsing of the event logs.
*/
def initialize(sparkListener: SparkListener, appId: String, attemptId: Option[String]): Unit = {}

/**
* Sets up UI of this plugin to rebuild the history UI.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[spark] class AppStatusListener(
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
private var appId: String = _
private var attemptId: Option[String] = None

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
Expand Down Expand Up @@ -100,6 +102,41 @@ private[spark] class AppStatusListener(
if (!live) {
val now = System.nanoTime()
flush(update(_, now))
if (appId != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be placed before flush().

// If incremental parsing is enabled, write the listener data to the store
val data = new AppStatusListenerData(appId, attemptId, liveStages, liveJobs,
liveExecutors, deadExecutors, liveTasks, liveRDDs,
pools, appInfo, coresPerTask, appSummary, activeExecutorCount)
kvstore.write(data)
}
}
}

def initialize(appId: String, attemptId: Option[String]): Unit = {
if (!live) {
// If incremental parsing is enabled, read and update the listener data
// from store
this.appId = appId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have some case class (or just simply tuple) to store the information altogether and name it which clearly representing "incremental parsing is used". Checking whether appId is available or not to determine whether incremental parsing is used or not wouldn't be easy to understand for newcomer of the code.

this.attemptId = attemptId
try {
val listenerData = kvstore.read(classOf[AppStatusListenerData],
Array(Some(appId), attemptId))
listenerData.liveStages.entrySet().asScala.foreach { entry =>
liveStages.put(entry.getKey, entry.getValue)
}
listenerData.liveJobs.map{entry => liveJobs.put(entry._1, entry._2)}
listenerData.liveExecutors.map{entry => liveExecutors.put(entry._1, entry._2)}
listenerData.deadExecutors.map{entry => deadExecutors.put(entry._1, entry._2)}
listenerData.liveTasks.map{entry => liveTasks.put(entry._1, entry._2)}
listenerData.liveRDDs.map{entry => liveRDDs.put(entry._1, entry._2)}
listenerData.pools.map{entry => pools.put(entry._1, entry._2)}
appInfo = listenerData.appInfo
appSummary = listenerData.appSummary
coresPerTask = listenerData.coresPerTask
activeExecutorCount = listenerData.activeExecutorCount
} catch {
case _: NoSuchElementException =>
}
}
}

Expand Down
Loading