Skip to content

Commit 3150ed6

Browse files
committed
SPARK-1557 Set permissions on event log files/directories
1 parent 1fdf659 commit 3150ed6

File tree

3 files changed

+26
-12
lines changed

3 files changed

+26
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ private[spark] class EventLoggingListener(
5151
private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
5252
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
5353
val logDir = logBaseDir + "/" + name
54+
val LOG_FILE_PERMISSIONS: FsPermission = FsPermission.createImmutable(0770: Short)
5455

5556
private val logger =
5657
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
57-
shouldOverwrite)
58+
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
5859

5960
/**
6061
* Begin logging events.
@@ -64,10 +65,11 @@ private[spark] class EventLoggingListener(
6465
logInfo("Logging events to %s".format(logDir))
6566
if (shouldCompress) {
6667
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
67-
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
68+
logger.newFile(COMPRESSION_CODEC_PREFIX + codec, Some(LOG_FILE_PERMISSIONS))
6869
}
69-
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
70-
logger.newFile(LOG_PREFIX + logger.fileIndex)
70+
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION,
71+
Some(LOG_FILE_PERMISSIONS))
72+
logger.newFile(LOG_PREFIX + logger.fileIndex, Some(LOG_FILE_PERMISSIONS))
7173
}
7274

7375
/** Log the event as JSON. */
@@ -114,7 +116,7 @@ private[spark] class EventLoggingListener(
114116
* In addition, create an empty special file to indicate application completion.
115117
*/
116118
def stop() = {
117-
logger.newFile(APPLICATION_COMPLETE)
119+
logger.newFile(APPLICATION_COMPLETE, Some(LOG_FILE_PERMISSIONS))
118120
logger.stop()
119121
}
120122
}

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
2727

2828
import org.apache.spark.{Logging, SparkConf}
2929
import org.apache.spark.io.CompressionCodec
30+
import org.apache.hadoop.fs.permission.FsPermission
3031

3132
/**
3233
* A generic class for logging information to file.
@@ -42,7 +43,8 @@ private[spark] class FileLogger(
4243
hadoopConfiguration: Configuration,
4344
outputBufferSize: Int = 8 * 1024, // 8 KB
4445
compress: Boolean = false,
45-
overwrite: Boolean = true)
46+
overwrite: Boolean = true,
47+
dirPermissions: Option[FsPermission] = None)
4648
extends Logging {
4749

4850
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
@@ -60,12 +62,12 @@ private[spark] class FileLogger(
6062

6163
private var writer: Option[PrintWriter] = None
6264

63-
createLogDir()
65+
createLogDir(dirPermissions)
6466

6567
/**
6668
* Create a logging directory with the given path.
6769
*/
68-
private def createLogDir() {
70+
private def createLogDir(dirPerms: Option[FsPermission]) {
6971
val path = new Path(logDir)
7072
if (fileSystem.exists(path)) {
7173
if (overwrite) {
@@ -79,16 +81,23 @@ private[spark] class FileLogger(
7981
if (!fileSystem.mkdirs(path)) {
8082
throw new IOException("Error in creating log directory: %s".format(logDir))
8183
}
84+
if (dirPerms.isDefined) {
85+
val fsStatus = fileSystem.getFileStatus(path)
86+
if (fsStatus.getPermission().toShort() != dirPerms.get.toShort()) {
87+
fileSystem.setPermission(path, dirPerms.get);
88+
}
89+
}
8290
}
8391

8492
/**
8593
* Create a new writer for the file identified by the given path.
8694
*/
87-
private def createWriter(fileName: String): PrintWriter = {
95+
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
8896
val logPath = logDir + "/" + fileName
8997
val uri = new URI(logPath)
9098
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
9199
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
100+
val path = new Path(logPath)
92101

93102
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
94103
* Therefore, for local files, use FileOutputStream instead. */
@@ -97,11 +106,11 @@ private[spark] class FileLogger(
97106
// Second parameter is whether to append
98107
new FileOutputStream(uri.getPath, !overwrite)
99108
} else {
100-
val path = new Path(logPath)
101109
hadoopDataStream = Some(fileSystem.create(path, overwrite))
102110
hadoopDataStream.get
103111
}
104112

113+
if (perms.isDefined) fileSystem.setPermission(path, perms.get)
105114
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
106115
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
107116
new PrintWriter(cstream)
@@ -150,15 +159,16 @@ private[spark] class FileLogger(
150159
/**
151160
* Start a writer for a new file, closing the existing one if it exists.
152161
* @param fileName Name of the new file, defaulting to the file index if not provided.
162+
* @param perms Permissions to put on the new file.
153163
*/
154-
def newFile(fileName: String = "") {
164+
def newFile(fileName: String = "", perms: Option[FsPermission] = None) {
155165
fileIndex += 1
156166
writer.foreach(_.close())
157167
val name = fileName match {
158168
case "" => fileIndex.toString
159169
case _ => fileName
160170
}
161-
writer = Some(createWriter(name))
171+
writer = Some(createWriter(name, perms))
162172
}
163173

164174
/**

docs/security.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Spark currently supports authentication via a shared secret. Authentication can
77

88
The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI.
99

10+
If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secure, the permissions should be set to drwxrwxrwxt for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.
11+
1012
For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
1113

1214
For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.

0 commit comments

Comments
 (0)