Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val appName = conf.get("spark.app.name")

private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
private[spark] val eventLogDir: Option[String] = {
private[spark] val eventLogDir: Option[URI] = {
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
Expand Down Expand Up @@ -1138,7 +1140,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.deploy

import java.net.URI

private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None,
val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {
Expand All @@ -36,7 +38,7 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[String] = eventLogDir,
eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.master

import java.io.FileNotFoundException
import java.net.URLEncoder
import java.net.{URI, URLEncoder}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't need to change. I'll fix this when I merge

import java.text.SimpleDateFormat
import java.util.Date

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
logBaseDir: String,
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf))
Expand Down Expand Up @@ -259,13 +259,13 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogPath(
logBaseDir: String,
logBaseDir: URI,
appId: String,
compressionCodecName: Option[String] = None): String = {
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
// e.g. app_123, app_123.lzf
val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
logBaseDir.toString.stripSuffix("/") + "/" + logName
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
eventLogger.start()

val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
Expand Down Expand Up @@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
Expand All @@ -107,16 +107,19 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef

test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("/base-dir"), "app1"))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1"))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", Some("lz4")))
}

/* ----------------- *
Expand All @@ -137,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
Expand Down Expand Up @@ -173,12 +176,15 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
// Set defaultFS to something that would cause an exception, to make sure we don't run
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI().toString()
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.io.{File, PrintWriter}
import java.net.URI

import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
Expand Down Expand Up @@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
extends EventLoggingListener("test", "testdir", conf) {
extends EventLoggingListener("test", new URI("testdir"), conf) {

override def start() { }

Expand Down