Skip to content

Commit dd1b7a6

Browse files
Marcelo Vanzinpwendell
authored andcommitted
Honor default fs name when initializing event logger.
This is related to SPARK-1459 / PR #375. Without this fix, FileLogger.createLogDir() may try to create the log dir on HDFS, while createWriter() will try to open the log file on the local file system, leading to interesting errors and confusion. Author: Marcelo Vanzin <[email protected]> Closes #450 from vanzin/event-file-2 and squashes the following commits: 592cdb3 [Marcelo Vanzin] Honor default fs name when initializing event logger.
1 parent a967b00 commit dd1b7a6

File tree

3 files changed

+41
-33
lines changed

3 files changed

+41
-33
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
216216
private[spark] val ui = new SparkUI(this)
217217
ui.bind()
218218

219+
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
220+
val hadoopConfiguration: Configuration = {
221+
val env = SparkEnv.get
222+
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
223+
// Explicitly check for S3 environment variables
224+
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
225+
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
226+
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
227+
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
228+
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
229+
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
230+
}
231+
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
232+
conf.getAll.foreach { case (key, value) =>
233+
if (key.startsWith("spark.hadoop.")) {
234+
hadoopConf.set(key.substring("spark.hadoop.".length), value)
235+
}
236+
}
237+
val bufferSize = conf.get("spark.buffer.size", "65536")
238+
hadoopConf.set("io.file.buffer.size", bufferSize)
239+
hadoopConf
240+
}
241+
219242
// Optionally log Spark events
220243
private[spark] val eventLogger: Option[EventLoggingListener] = {
221244
if (conf.getBoolean("spark.eventLog.enabled", false)) {
222-
val logger = new EventLoggingListener(appName, conf)
245+
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
223246
logger.start()
224247
listenerBus.addListener(logger)
225248
Some(logger)
@@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
294317
postEnvironmentUpdate()
295318
postApplicationStart()
296319

297-
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
298-
val hadoopConfiguration: Configuration = {
299-
val env = SparkEnv.get
300-
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
301-
// Explicitly check for S3 environment variables
302-
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
303-
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
304-
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
305-
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
306-
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
307-
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
308-
}
309-
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
310-
conf.getAll.foreach { case (key, value) =>
311-
if (key.startsWith("spark.hadoop.")) {
312-
hadoopConf.set(key.substring("spark.hadoop.".length), value)
313-
}
314-
}
315-
val bufferSize = conf.get("spark.buffer.size", "65536")
316-
hadoopConf.set("io.file.buffer.size", bufferSize)
317-
hadoopConf
318-
}
319-
320320
private[spark] var checkpointDir: Option[String] = None
321321

322322
// Thread Local variable that can be used by users to pass information down the stack

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import scala.collection.mutable
2121

22+
import org.apache.hadoop.conf.Configuration
2223
import org.apache.hadoop.fs.{FileSystem, Path}
2324
import org.json4s.jackson.JsonMethods._
2425

@@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
3637
* spark.eventLog.dir - Path to the directory in which events are logged.
3738
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
3839
*/
39-
private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
40+
private[spark] class EventLoggingListener(
41+
appName: String,
42+
conf: SparkConf,
43+
hadoopConfiguration: Configuration)
4044
extends SparkListener with Logging {
4145

4246
import EventLoggingListener._
@@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
4953
val logDir = logBaseDir + "/" + name
5054

5155
private val logger =
52-
new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
56+
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
57+
shouldOverwrite)
5358

5459
/**
5560
* Begin logging events.

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.Date
2424

25-
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
25+
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
2627

2728
import org.apache.spark.{Logging, SparkConf}
2829
import org.apache.spark.io.CompressionCodec
@@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec
3738
*/
3839
private[spark] class FileLogger(
3940
logDir: String,
40-
conf: SparkConf = new SparkConf,
41+
conf: SparkConf,
42+
hadoopConfiguration: Configuration,
4143
outputBufferSize: Int = 8 * 1024, // 8 KB
4244
compress: Boolean = false,
4345
overwrite: Boolean = true)
@@ -85,19 +87,20 @@ private[spark] class FileLogger(
8587
private def createWriter(fileName: String): PrintWriter = {
8688
val logPath = logDir + "/" + fileName
8789
val uri = new URI(logPath)
90+
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
91+
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
8892

8993
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
9094
* Therefore, for local files, use FileOutputStream instead. */
91-
val dstream = uri.getScheme match {
92-
case "file" | null =>
95+
val dstream =
96+
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
9397
// Second parameter is whether to append
9498
new FileOutputStream(uri.getPath, !overwrite)
95-
96-
case _ =>
99+
} else {
97100
val path = new Path(logPath)
98101
hadoopDataStream = Some(fileSystem.create(path, overwrite))
99102
hadoopDataStream.get
100-
}
103+
}
101104

102105
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
103106
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream

0 commit comments

Comments
 (0)