Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
4180993
Modified SparkContext to retain spark.unique.app.name property in Spa…
sarutak Sep 3, 2014
55debab
Modified SparkContext and Executor to set spark.executor.id to identi…
sarutak Sep 3, 2014
71609f5
Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockMa…
sarutak Sep 3, 2014
868e326
Modified MetricsSystem to set registry name with unique application-i…
sarutak Sep 3, 2014
85ffc02
Revert "Modified sourceName of ExecutorSource, DAGSchedulerSource and…
sarutak Sep 3, 2014
4e057c9
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 3, 2014
6fc5560
Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockMa…
sarutak Sep 3, 2014
6f7dcd4
Modified constructor of DAGSchedulerSource and BlockManagerSource bec…
sarutak Sep 3, 2014
15f88a3
Modified MetricsSystem#buildRegistryName because conf.get does not re…
sarutak Sep 3, 2014
fa7175b
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 4, 2014
4603a39
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 4, 2014
3e098d8
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 4, 2014
e4a4593
tmp
sarutak Sep 4, 2014
912a637
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 7, 2014
848819c
Merge branch 'metrics-structure-improvement' of github.com:sarutak/sp…
sarutak Sep 12, 2014
93e263a
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 12, 2014
45bd33d
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 12, 2014
7b67f5a
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 13, 2014
08e627e
Revert "tmp"
sarutak Sep 15, 2014
ead8966
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 15, 2014
3ea7896
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 16, 2014
2ec848a
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 17, 2014
efcb6e1
Modified to add application id to metrics name
sarutak Sep 17, 2014
4776f9e
Modified MetricsSystemSuite.scala
sarutak Sep 17, 2014
4a93c7f
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 19, 2014
e719c39
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 21, 2014
c229fbe
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 22, 2014
eea6e19
Modified CoarseGrainedMesosSchedulerBackend and MesosSchedulerBackend…
sarutak Sep 22, 2014
36d2f7a
Added warning message for the situation we cannot get application id …
sarutak Sep 22, 2014
e705386
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 24, 2014
086ee25
Merge branch 'metrics-structure-improvement2' of github.com:sarutak/s…
sarutak Sep 24, 2014
8a2b6ec
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 25, 2014
b311806
Swapped last 2 arguments passed to CoarseGrainedExecutorBackend
sarutak Sep 25, 2014
424fea4
Modified the subclasses of TaskScheduler and SchedulerBackend so tha…
sarutak Sep 25, 2014
203634e
Modified comment in SchedulerBackend#applicationId and TaskScheduler#…
sarutak Sep 25, 2014
28d4d93
Modified SparkContext and EventLoggingListener so that the directory …
sarutak Sep 25, 2014
bcf25bf
Modified directory name for EventLogs
sarutak Sep 25, 2014
0f890e6
Modified SparkDeploySchedulerBackend and Master to pass baseLogDir in…
sarutak Sep 26, 2014
eabda80
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 26, 2014
0a2fc14
Modified style
sarutak Sep 26, 2014
0325caf
Added ApplicationId.scala
sarutak Sep 26, 2014
6a91b14
Modified SparkContextSchedulerCreationSuite, ExecutorRunnerTest and E…
sarutak Sep 28, 2014
4567ffc
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 29, 2014
9ff4851
Modified MimaExcludes.scala to ignore createTaskScheduler method in S…
sarutak Sep 29, 2014
dfc83fd
Modified ApplicationId to implement Serializable
sarutak Sep 29, 2014
d009c55
Modified ApplicationId#equals to compare appIds
sarutak Sep 29, 2014
3011efc
Added ApplicationIdSuite.scala
sarutak Sep 29, 2014
9aadb0b
Modified NetworkReceiverSuite to ensure "executor.start()" is finishe…
sarutak Sep 29, 2014
2cdd009
Modified defailt implementation of applicationId
sarutak Sep 29, 2014
97cb85c
Modified confliction of MimExcludes
sarutak Sep 29, 2014
1b8b53e
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 30, 2014
f6af132
Modified SchedulerBackend and TaskScheduler to return System.currentT…
sarutak Sep 30, 2014
248935d
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 30, 2014
42bea55
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 30, 2014
0fc1b09
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 1, 2014
a42300c
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 2, 2014
0413b90
Deleted ApplicationId.java and ApplicationIdSuite.java
sarutak Oct 2, 2014
6434b06
Reverted changes related to ApplicationId
sarutak Oct 2, 2014
16a9f01
Added data types to be returned to some methods
sarutak Oct 2, 2014
5cca0d2
Added Javadoc comment to SparkContext#getApplicationId
sarutak Oct 2, 2014
69c46a6
Added warning logging logic to MetricsSystem#buildRegistryName
sarutak Oct 2, 2014
ff45c89
Added some test cases to MetricsSystemSuite
sarutak Oct 2, 2014
389090d
Replaced taskScheduler.applicationId() with getApplicationId in Spark…
sarutak Oct 2, 2014
2cf8a0f
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 2, 2014
f9b6fb3
Modified style.
sarutak Oct 2, 2014
59cc2cd
Modified SparkContextSchedulerCreationSuite
sarutak Oct 2, 2014
f0c7fba
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 2, 2014
990c078
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 3, 2014
10be654
Fixed style.
sarutak Oct 3, 2014
67fa5eb
Unified MetricsSystem#registerSources and registerSinks in start
sarutak Oct 3, 2014
817e4f0
Simplified MetricsSystem#buildRegistryName
sarutak Oct 3, 2014
6570494
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 3, 2014
39169e4
Fixed style
sarutak Oct 3, 2014
3288b2b
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Oct 3, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
private[spark] val eventLogDir: Option[String] = {
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
} else {
None
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you don't pass this into the task scheduler, you don't even need this variable anymore. Then you can do the check for whether event logging is enabled down there with where you instantiate the EventLoggingListener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, those are needed because SparkDeploySchedulerBackend references sc.eventLogDir.
We cannot do that pass the eventLogDir to only EventLoggingListener and SparkDeploySchedulerBackend refers the eventLogDir via EventLoggingListener because EventLoggingListener should be instantiated after creating task scheduler. Task scheduler cannot refer.

SparkDeploySchedulerBackend and EventLoggingListener can get eventLogDir by conf.get themselves. But, if we take such approach, EventLoggingListener should check whether event logging is enabled before getting eventLogDir even though the check is done in SparkContext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, it's because of this new dependency order. Alright, I guess we have no choice but to keep them then. Either way these should be private[spark] instead of completely public.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we have to do is make isEventLogDir and eventLogDir private[spark] right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
Expand All @@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this used for? The driver is technically not an executor (yes, we conflate the two elsewhere but that's bad too).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is that certain types of metrics can be generated by components that run on both drivers and executors (e.g. BlockManager metrics) and we'd like to capture the location where the metric is generated as part of the metric name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't have a good alternative suggestion for this.

private[spark] val env = SparkEnv.create(
conf,
"<driver>",
Expand Down Expand Up @@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
Expand Down Expand Up @@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
// constructor
taskScheduler.start()

val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)

val metricsSystem = env.metricsSystem

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting these three into MetricsSystem#initialize(), and add a comment here explaining why the metrics system must be initialized after the task scheduler (because it needs the ID). These three lines are duplicated elsewhere so it would be good to consolidate usages of them.


// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
Expand Down Expand Up @@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
taskScheduler.postStartHook()

private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
Expand Down Expand Up @@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,15 @@ object SparkEnv extends Logging {
}

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a comment here to explain why we don't initialize it right away if it's the driver (i.e. we need to wait for the task scheduler to give us an app ID)

} else {
MetricsSystem.createMetricsSystem("executor", conf, securityManager)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
metricsSystem.start()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove random new line

// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
SparkHadoopUtil}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
Expand Down Expand Up @@ -693,16 +693,18 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir,

val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec

if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $eventLogDir."
var msg = s"No event logs found for application $appName in $appEventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String]) {

SignalLogger.register(log)
Expand All @@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

// Create a new ActorSystem using driver's Spark properties to run the backend.
Expand All @@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

def main(args: Array[String]) {
args.length match {
case x if x < 4 =>
case x if x < 5 =>
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> [<workerUrl>]")
"<cores> <appid> [<workerUrl>] ")
System.exit(1)
case 4 =>
run(args(0), args(1), args(2), args(3).toInt, None)
case x if x > 4 =>
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
case 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
case x if x > 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)

// Initialize Spark environment (using system properties read above)
conf.set("spark.executor.id", "executor." + executorId)
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)

override val metricRegistry = new MetricRegistry()

// TODO: It would be nice to pass the application name here
override val sourceName = "executor.%s".format(executorId)
override val sourceName = "executor"

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
Expand Down
40 changes: 35 additions & 5 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ private[spark] class MetricsSystem private (
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())

metricsConfig.initialize()
registerSources()
registerSinks()

def start() {
registerSources()
registerSinks()
sinks.foreach(_.start)
}

Expand All @@ -98,19 +98,49 @@ private[spark] class MetricsSystem private (
sinks.foreach(_.report())
}

/**
* Build a name that uniquely identifies each metric source.
* The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
* If either ID is not available, this defaults to just using <source name>.
*
* @param source Metric source to be named by this method.
* @return An unique metric name for each combination of
* application, executor/driver and metric source.
*/
def buildRegistryName(source: Source): String = {
val appId = conf.getOption("spark.app.id")
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)

if (instance == "driver" || instance == "executor") {
if (appId.isDefined && executorId.isDefined) {
MetricRegistry.name(appId.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor are set spark.app.id and spark.executor.id.
// For instance, Master and Worker are not related to a specific application.
val warningMsg = s"Using default name $defaultName for source because %s is not set."
if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
defaultName
}
} else { defaultName }
}

def registerSource(source: Source) {
sources += source
try {
registry.register(source.sourceName, source.metricRegistry)
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}

def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
}

Expand All @@ -125,7 +155,7 @@ private[spark] class MetricsSystem private (
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source

private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.DAGScheduler".format(sc.appName)
override val sourceName = "DAGScheduler"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? I'd say we want at least some relationship with the application here. Since we rely on the application ID we can solve SPARK-3377, but as a backup it's still not a bad idea to use the app name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this patch, I don't think that this sourceName is the full name of the source. If you look at Spark's MetricsSystem.registerSource() method, it takes the source name and prepends a bunch of app-specific information, including the appId, appName, executorId, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So really this is the sourceSuffix rather than the name. If there is a failure condition in the MetricsSystem then we just use the raw suffix. That sounds fine.


metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,29 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
appName: String,
appId: String,
logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appName: String, sparkConf: SparkConf) =
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
.toLowerCase + "-" + System.currentTimeMillis
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
val logDirName: String = logDir.split("/").last
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Return only the unique application directory without the base directory.
*/
def getApplicationLogDir(): String = {
name
}

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
Expand Down Expand Up @@ -184,6 +175,18 @@ private[spark] object EventLoggingListener extends Logging {
} else ""
}

/**
* Return a file-system-safe path to the log directory for the given application.
*
* @param logBaseDir A base directory for the path to the log directory for given application.
* @param appId A unique app ID.
* @return A path which consists of file-system-safe characters.
*/
def getLogDirPath(logBaseDir: String, appId: String): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
}

/**
* Parse the event logging information associated with the logs in the given directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package org.apache.spark.scheduler
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillis

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
Expand All @@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend {
def isReady(): Boolean = true

/**
* The application ID associated with the job, if any.
* Get an application ID associated with the job.
*
* @return The application ID, or None if the backend does not provide an ID.
* @return An application ID
*/
def applicationId(): Option[String] = None
def applicationId(): String = appId

}
Loading