Skip to content

Commit 14632b7

Browse files
Marcelo VanzinAndrew Or
authored andcommitted
[SPARK-6688] [core] Always use resolved URIs in EventLoggingListener.
Author: Marcelo Vanzin <[email protected]> Closes apache#5340 from vanzin/SPARK-6688 and squashes the following commits: ccfddd9 [Marcelo Vanzin] Resolve at the source. 20d2a34 [Marcelo Vanzin] [SPARK-6688] [core] Always use resolved URIs in EventLoggingListener.
1 parent ffe8cc9 commit 14632b7

File tree

6 files changed

+30
-19
lines changed

6 files changed

+30
-19
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
227227
val appName = conf.get("spark.app.name")
228228

229229
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
230-
private[spark] val eventLogDir: Option[String] = {
230+
private[spark] val eventLogDir: Option[URI] = {
231231
if (isEventLogEnabled) {
232-
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
232+
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
233+
.stripSuffix("/")
234+
Some(Utils.resolveURI(unresolvedDir))
233235
} else {
234236
None
235237
}

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URI
21+
2022
private[spark] class ApplicationDescription(
2123
val name: String,
2224
val maxCores: Option[Int],
2325
val memoryPerSlave: Int,
2426
val command: Command,
2527
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None,
28+
val eventLogDir: Option[URI] = None,
2729
// short name of compression codec used when writing event logs, if any (e.g. lzf)
2830
val eventLogCodec: Option[String] = None)
2931
extends Serializable {
@@ -36,7 +38,7 @@ private[spark] class ApplicationDescription(
3638
memoryPerSlave: Int = memoryPerSlave,
3739
command: Command = command,
3840
appUiUrl: String = appUiUrl,
39-
eventLogDir: Option[String] = eventLogDir,
41+
eventLogDir: Option[URI] = eventLogDir,
4042
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
4143
new ApplicationDescription(
4244
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,21 @@ import org.apache.spark.util.{JsonProtocol, Utils}
4747
*/
4848
private[spark] class EventLoggingListener(
4949
appId: String,
50-
logBaseDir: String,
50+
logBaseDir: URI,
5151
sparkConf: SparkConf,
5252
hadoopConf: Configuration)
5353
extends SparkListener with Logging {
5454

5555
import EventLoggingListener._
5656

57-
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
57+
def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
5858
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
5959

6060
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
6161
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
6262
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
6363
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
64-
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
64+
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
6565
private val compressionCodec =
6666
if (shouldCompress) {
6767
Some(CompressionCodec.createCodec(sparkConf))
@@ -259,13 +259,13 @@ private[spark] object EventLoggingListener extends Logging {
259259
* @return A path which consists of file-system-safe characters.
260260
*/
261261
def getLogPath(
262-
logBaseDir: String,
262+
logBaseDir: URI,
263263
appId: String,
264264
compressionCodecName: Option[String] = None): String = {
265265
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
266266
// e.g. app_123, app_123.lzf
267267
val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
268-
Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
268+
logBaseDir.toString.stripSuffix("/") + "/" + logName
269269
}
270270

271271
/**

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
5050
inProgress: Boolean,
5151
codec: Option[String] = None): File = {
5252
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
53-
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
53+
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
5454
val logPath = new URI(logUri).getPath + ip
5555
new File(logPath)
5656
}

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
6161
test("Verify log file exist") {
6262
// Verify logging directory exists
6363
val conf = getLoggingConf(testDirPath)
64-
val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
64+
val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
6565
eventLogger.start()
6666

6767
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
9595
}
9696

9797
test("Log overwriting") {
98-
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
98+
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
9999
val logPath = new URI(logUri).getPath
100100
// Create file before writing the event log
101101
new FileOutputStream(new File(logPath)).close()
@@ -107,16 +107,19 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
107107

108108
test("Event log name") {
109109
// without compression
110-
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
110+
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
111+
Utils.resolveURI("/base-dir"), "app1"))
111112
// with compression
112113
assert(s"file:/base-dir/app1.lzf" ===
113-
EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
114+
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
114115
// illegal characters in app ID
115116
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
116-
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
117+
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
118+
"a fine:mind$dollar{bills}.1"))
117119
// illegal characters in app ID with compression
118120
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
119-
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
121+
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
122+
"a fine:mind$dollar{bills}.1", Some("lz4")))
120123
}
121124

122125
/* ----------------- *
@@ -137,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
137140
val conf = getLoggingConf(testDirPath, compressionCodec)
138141
extraConf.foreach { case (k, v) => conf.set(k, v) }
139142
val logName = compressionCodec.map("test-" + _).getOrElse("test")
140-
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
143+
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
141144
val listenerBus = new LiveListenerBus
142145
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
143146
125L, "Mickey")
@@ -173,12 +176,15 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
173176
* This runs a simple Spark job and asserts that the expected events are logged when expected.
174177
*/
175178
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
179+
// Set defaultFS to something that would cause an exception, to make sure we don't run
180+
// into SPARK-6688.
176181
val conf = getLoggingConf(testDirPath, compressionCodec)
182+
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
177183
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
178184
assert(sc.eventLogger.isDefined)
179185
val eventLogger = sc.eventLogger.get
180186
val eventLogPath = eventLogger.logPath
181-
val expectedLogDir = testDir.toURI().toString()
187+
val expectedLogDir = testDir.toURI()
182188
assert(eventLogPath === EventLoggingListener.getLogPath(
183189
expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
184190

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io.{File, PrintWriter}
21+
import java.net.URI
2122

2223
import org.json4s.jackson.JsonMethods._
2324
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
145146
* log the events.
146147
*/
147148
private class EventMonster(conf: SparkConf)
148-
extends EventLoggingListener("test", "testdir", conf) {
149+
extends EventLoggingListener("test", new URI("testdir"), conf) {
149150

150151
override def start() { }
151152

0 commit comments

Comments
 (0)