Skip to content

Commit d02dbaa

Browse files
committed
Expose Spark version and include it in event logs
This allows us to deal with inconsistencies in event log version incompatibilities in the future.
1 parent 2282300 commit d02dbaa

File tree

3 files changed

+40
-14
lines changed

3 files changed

+40
-14
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
775775
listenerBus.addListener(listener)
776776
}
777777

778+
/** The version of Spark on which this application is running. */
779+
def version = SparkContext.SPARK_VERSION
780+
778781
/**
779782
* Return a map from the slave to the max memory available for caching and the remaining
780783
* memory available for caching.
@@ -1213,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
12131216
*/
12141217
object SparkContext extends Logging {
12151218

1219+
private[spark] val SPARK_VERSION = "1.0.0"
1220+
12161221
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
12171222

12181223
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import org.json4s.jackson.JsonMethods._
2121

22-
import org.apache.spark.{Logging, SparkConf}
22+
import org.apache.spark.{Logging, SparkConf, SparkContext}
2323
import org.apache.spark.io.CompressionCodec
2424
import org.apache.spark.util.{JsonProtocol, FileLogger}
2525

@@ -58,6 +58,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
5858
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
5959
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
6060
}
61+
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
6162
logger.newFile(LOG_PREFIX + logger.fileIndex)
6263
}
6364

@@ -111,22 +112,33 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
111112
}
112113

113114
private[spark] object EventLoggingListener {
115+
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
114116
val LOG_PREFIX = "EVENT_LOG_"
115117
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
116118
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
117119

120+
def isSparkVersionFile(fileName: String): Boolean = {
121+
fileName.startsWith(SPARK_VERSION_PREFIX)
122+
}
123+
118124
def isEventLogFile(fileName: String): Boolean = {
119-
fileName.contains(LOG_PREFIX)
125+
fileName.startsWith(LOG_PREFIX)
120126
}
121127

122128
def isCompressionCodecFile(fileName: String): Boolean = {
123-
fileName.contains(COMPRESSION_CODEC_PREFIX)
129+
fileName.startsWith(COMPRESSION_CODEC_PREFIX)
124130
}
125131

126132
def isApplicationCompleteFile(fileName: String): Boolean = {
127133
fileName == APPLICATION_COMPLETE
128134
}
129135

136+
def parseSparkVersion(fileName: String): String = {
137+
if (isSparkVersionFile(fileName)) {
138+
fileName.replaceAll(SPARK_VERSION_PREFIX, "")
139+
} else ""
140+
}
141+
130142
def parseCompressionCodec(fileName: String): String = {
131143
if (isCompressionCodecFile(fileName)) {
132144
fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private[spark] class ReplayListenerBus(
4343
def this(logDir: String) = this(logDir, Utils.getHadoopFileSystem(logDir))
4444

4545
private var applicationComplete = false
46+
private var sparkVersion: Option[String] = None
4647
private var compressionCodec: Option[CompressionCodec] = None
4748
private var logPaths = Array[Path]()
4849
private var started = false
@@ -52,25 +53,27 @@ private[spark] class ReplayListenerBus(
5253
* Prepare state for reading event logs.
5354
*
5455
* This gathers relevant files in the given directory and extracts meaning from each category.
55-
* More specifically, this involves looking for event logs, the compression codec file
56-
* (if event logs are compressed), and the application completion file (if the application
57-
* has run to completion).
56+
* More specifically, this involves looking for event logs, the Spark version file, the
57+
* compression codec file (if event logs are compressed), and the application completion
58+
* file (if the application has run to completion).
5859
*/
5960
def start() {
6061
val filePaths = getFilePaths(logDir, fileSystem)
61-
logPaths = filePaths.filter { file => EventLoggingListener.isEventLogFile(file.getName) }
62-
compressionCodec =
63-
filePaths.find { file =>
64-
EventLoggingListener.isCompressionCodecFile(file.getName)
65-
}.map { file =>
62+
logPaths = filePaths
63+
.filter { file => EventLoggingListener.isEventLogFile(file.getName) }
64+
sparkVersion = filePaths
65+
.find { file => EventLoggingListener.isSparkVersionFile(file.getName) }
66+
.map { file => EventLoggingListener.parseSparkVersion(file.getName) }
67+
compressionCodec = filePaths
68+
.find { file => EventLoggingListener.isCompressionCodecFile(file.getName) }
69+
.map { file =>
6670
val codec = EventLoggingListener.parseCompressionCodec(file.getName)
6771
val conf = new SparkConf
6872
conf.set("spark.io.compression.codec", codec)
6973
CompressionCodec.createCodec(conf)
7074
}
71-
applicationComplete = filePaths.exists { file =>
72-
EventLoggingListener.isApplicationCompleteFile(file.getName)
73-
}
75+
applicationComplete = filePaths
76+
.exists { file => EventLoggingListener.isApplicationCompleteFile(file.getName) }
7477
started = true
7578
}
7679

@@ -80,6 +83,12 @@ private[spark] class ReplayListenerBus(
8083
applicationComplete
8184
}
8285

86+
/** Return the version of Spark on which the given application was run. */
87+
def getSparkVersion: String = {
88+
assert(started, "ReplayListenerBus not started yet")
89+
sparkVersion.getOrElse("<Unknown>")
90+
}
91+
8392
/**
8493
* Replay each event in the order maintained in the given logs. This should only be called
8594
* exactly once. Return whether event logs are actually found.

0 commit comments

Comments
 (0)