From 10d254b4339d3e024f717e0f0666f996527bab23 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 31 Mar 2017 17:35:47 +0800 Subject: [PATCH 1/7] Add file permission check in listing files Change-Id: I7c7014ed83dbba786d90ca530380e94c086b497b --- .../deploy/history/FsHistoryProvider.scala | 8 +++-- .../history/FsHistoryProviderSuite.scala | 29 +++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9012736bc274..174f4d87de40 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -27,7 +27,8 @@ import scala.xml.Node import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.hadoop.security.AccessControlException @@ -320,6 +321,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .filter { entry => try { val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + fs.access(entry.getPath, FsAction.READ) !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to @@ -327,7 +329,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) !entry.getPath().getName().startsWith(".") && prevFileSize < entry.getLen() } catch { - case e: AccessControlException => + case _: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on // every poll. logDebug(s"No permission to read $entry, ignoring.") @@ -445,7 +447,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Replay the log files in the list and merge the list of old applications with new ones */ - private def mergeApplicationListing(fileStatus: FileStatus): Unit = { + protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { val newAttempts = try { val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ec580a44b8e7..be833104ee4a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ import org.mockito.Matchers.any @@ -571,6 +572,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("log without read permission should be filtered out before actual reading") { + class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { + var mergeApplicationListingCall = 0 + override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { + super.mergeApplicationListing(fileStatus) + mergeApplicationListingCall += 1 + } + } + + val provider = new TestFsHistoryProvider + val log = newLogFile("app1", Some("app1"), inProgress = true) + writeFile(log, true, None, + SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), + "test", Some("attempt1")), + SparkListenerApplicationEnd(System.currentTimeMillis())) + + // Set the read permission to false to simulate access permission not allowed scenario. + log.setReadable(false) + + updateAndCheck(provider) { list => + list.size should be (0) + } + + // Because we already filter out logs without read permission, so it will get a empty file list + // and not invoke mergeApplicationListing() call. + provider.mergeApplicationListingCall should be (0) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: From 63142cc82cc131b51c9d66525670caf4cba2ad39 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 31 Mar 2017 17:50:38 +0800 Subject: [PATCH 2/7] Also add logics to handle other exceptions Change-Id: Ie2bd075561e481266dc8047e2af604f4d6c83810 --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 174f4d87de40..85691598b425 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -334,6 +334,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // every poll. logDebug(s"No permission to read $entry, ignoring.") false + + case e: Exception => + logDebug(s"Fail to get status of $entry", e) + false } } .flatMap { entry => Some(entry) } From 1d1440bcd956f0f80e299edea72bf54e1f0d6b0d Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 11 Apr 2017 10:45:41 +0800 Subject: [PATCH 3/7] Address the comments Change-Id: I95fe3f765c8b66cc14bc888899f99dc8a0466e91 --- .../deploy/history/FsHistoryProvider.scala | 30 ++++++++++--- .../history/FsHistoryProviderSuite.scala | 43 ++++++------------- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 85691598b425..c74d11168211 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants -import org.apache.hadoop.security.AccessControlException +import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -321,23 +321,39 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .filter { entry => try { val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) - fs.access(entry.getPath, FsAction.READ) + + def canAccess = { + val perm = entry.getPermission + val ugi = UserGroupInformation.getCurrentUser + val user = ugi.getShortUserName + val groups = ugi.getGroupNames + if (user == entry.getOwner && perm.getUserAction.implies(FsAction.READ)) { + true + } else if (groups.contains(entry.getGroup) && + perm.getGroupAction.implies(FsAction.READ)) { + true + } else if (perm.getOtherAction.implies(FsAction.READ)) { + true + } else { + throw new AccessControlException(s"Permission denied: user=$user, " + + s"path=${entry.getPath}:${entry.getOwner}:${entry.getGroup}" + + s"${if (entry.isDirectory) "d" else "-"}$perm") + } + } + !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() + prevFileSize < entry.getLen() && + canAccess } catch { case _: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on // every poll. logDebug(s"No permission to read $entry, ignoring.") false - - case e: Exception => - logDebug(s"Fail to get status of $entry", e) - false } } .flatMap { entry => Some(entry) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index be833104ee4a..456158d41b93 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -131,9 +131,19 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("SPARK-3697: ignore directories that cannot be read.") { + test("SPARK-3697: ignore files that cannot be read.") { // setReadable(...) does not work on Windows. Please refer JDK-6728842. assume(!Utils.isWindows) + + class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { + var mergeApplicationListingCall = 0 + override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { + super.mergeApplicationListing(fileStatus) + mergeApplicationListingCall += 1 + } + } + val provider = new TestFsHistoryProvider + val logFile1 = newLogFile("new1", None, inProgress = false) writeFile(logFile1, true, None, SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None), @@ -146,10 +156,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ) logFile2.setReadable(false, false) - val provider = new FsHistoryProvider(createTestConf()) updateAndCheck(provider) { list => list.size should be (1) } + + provider.mergeApplicationListingCall should be (1) } test("history file is renamed from inprogress to completed") { @@ -572,34 +583,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("log without read permission should be filtered out before actual reading") { - class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { - var mergeApplicationListingCall = 0 - override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { - super.mergeApplicationListing(fileStatus) - mergeApplicationListingCall += 1 - } - } - - val provider = new TestFsHistoryProvider - val log = newLogFile("app1", Some("app1"), inProgress = true) - writeFile(log, true, None, - SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), - "test", Some("attempt1")), - SparkListenerApplicationEnd(System.currentTimeMillis())) - - // Set the read permission to false to simulate access permission not allowed scenario. - log.setReadable(false) - - updateAndCheck(provider) { list => - list.size should be (0) - } - - // Because we already filter out logs without read permission, so it will get a empty file list - // and not invoke mergeApplicationListing() call. - provider.mergeApplicationListingCall should be (0) - } - /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: From 944701123cab5705d43769e4598226d27f105a58 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 13 Apr 2017 12:00:44 +0800 Subject: [PATCH 4/7] Further address the comments Change-Id: I38a5491e555496004204bf99634f7147dac6c642 --- .../apache/spark/deploy/SparkHadoopUtil.scala | 23 ++++++++++ .../deploy/history/FsHistoryProvider.scala | 46 ++++--------------- .../history/FsHistoryProviderSuite.scala | 17 ++++++- 3 files changed, 48 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f475ce87540a..68f28fd4e646 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} @@ -353,6 +354,28 @@ class SparkHadoopUtil extends Logging { } buffer.toString } + + private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = { + val perm = status.getPermission + val ugi = UserGroupInformation.getCurrentUser + + if (ugi.getShortUserName == status.getOwner) { + if (perm.getUserAction.implies(mode)) { + return true + } + } else if (ugi.getGroupNames.contains(status.getGroup)) { + if (perm.getGroupAction.implies(mode)) { + return true + } + } else if (perm.getOtherAction.implies(mode)) { + return true + } + + logDebug(s"Permission denied: user=${ugi.getShortUserName}, " + + s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" + + s"${if (status.isDirectory) "d" else "-"}$perm") + false + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c74d11168211..f4235df24512 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants -import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} +import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -319,42 +319,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // scan for modified applications, replay and merge them val logInfos: Seq[FileStatus] = statusList .filter { entry => - try { - val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) - - def canAccess = { - val perm = entry.getPermission - val ugi = UserGroupInformation.getCurrentUser - val user = ugi.getShortUserName - val groups = ugi.getGroupNames - if (user == entry.getOwner && perm.getUserAction.implies(FsAction.READ)) { - true - } else if (groups.contains(entry.getGroup) && - perm.getGroupAction.implies(FsAction.READ)) { - true - } else if (perm.getOtherAction.implies(FsAction.READ)) { - true - } else { - throw new AccessControlException(s"Permission denied: user=$user, " + - s"path=${entry.getPath}:${entry.getOwner}:${entry.getGroup}" + - s"${if (entry.isDirectory) "d" else "-"}$perm") - } - } - - !entry.isDirectory() && - // FsHistoryProvider generates a hidden file which can't be read. Accidentally - // reading a garbage file is safe, but we would log an error which can be scary to - // the end-user. - !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() && - canAccess - } catch { - case _: AccessControlException => - // Do not use "logInfo" since these messages can get pretty noisy if printed on - // every poll. - logDebug(s"No permission to read $entry, ignoring.") - false - } + val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + !entry.isDirectory() && + // FsHistoryProvider generates a hidden file which can't be read. Accidentally + // reading a garbage file is safe, but we would log an error which can be scary to + // the end-user. + !entry.getPath().getName().startsWith(".") && + prevFileSize < entry.getLen() && + SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) } .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 456158d41b93..b7c3a681ec48 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,7 +27,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ import org.mockito.Matchers.any @@ -37,6 +38,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ @@ -154,7 +156,20 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) + + val path = new Path(logFile2.toURI) + val fs = path.getFileSystem(SparkHadoopUtil.get.conf) + val status = fs.getFileStatus(path) + SparkHadoopUtil.get.checkAccessPermission(status, FsAction.READ) should be (true) + logFile2.setReadable(false, false) + val status1 = fs.getFileStatus(path) + SparkHadoopUtil.get.checkAccessPermission(status1, FsAction.READ) should be (false) + + logFile2.setReadable(false, true) + val status2 = fs.getFileStatus(path) + SparkHadoopUtil.get.checkAccessPermission(status2, FsAction.READ) should be (false) + updateAndCheck(provider) { list => list.size should be (1) From 4d3838a3050e849189d5cd48511f7c1bc891cd92 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 18 Apr 2017 12:31:19 +0800 Subject: [PATCH 5/7] Address the comments to change the UT Change-Id: Ib9ac4be0a896531a529c260197431df1c3adf77a --- .../spark/deploy/SparkHadoopUtilSuite.scala | 83 +++++++++++++++++++ .../history/FsHistoryProviderSuite.scala | 17 +--- 2 files changed, 84 insertions(+), 16 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala new file mode 100644 index 000000000000..da03de3f2d06 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} +import org.apache.hadoop.security.UserGroupInformation +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite + +class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { + test("check file permission") { + import FsAction._ + val user = UserGroupInformation.getCurrentUser.getShortUserName + val groups = UserGroupInformation.getCurrentUser.getGroupNames + require(!groups.isEmpty) + val sparkHadoopUtil = new SparkHadoopUtil + + // If file is owned by user and user has access permission + var status = fileStatus(user, groups.head, READ_WRITE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be (true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (true) + + // If file is owned by user but user has no access permission + status = fileStatus(user, groups.head, NONE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be (false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (false) + + // If file is owned by user's group and user's group has access permission + status = fileStatus("test", groups.head, NONE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be (true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (true) + + // If file is owned by user's group but user's group has no access permission + status = fileStatus("test", groups.head, READ_WRITE, NONE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be (false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (false) + + // If file is owned by other user and this user has access permission + status = fileStatus("test", "test", READ_WRITE, READ_WRITE, READ_WRITE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be (true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (true) + + // If file is owned by other user but this user has no access permission + status = fileStatus("test", "test", READ_WRITE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be (false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (false) + } + + private def fileStatus( + owner: String, + group: String, + userAction: FsAction, + groupAction: FsAction, + otherAction: FsAction): FileStatus = { + new FileStatus(0L, + false, + 0, + 0L, + 0L, + 0L, + new FsPermission(userAction, groupAction, otherAction), + owner, + group, + null) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index b7c3a681ec48..456158d41b93 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,8 +27,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ import org.mockito.Matchers.any @@ -38,7 +37,6 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ @@ -156,20 +154,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) - - val path = new Path(logFile2.toURI) - val fs = path.getFileSystem(SparkHadoopUtil.get.conf) - val status = fs.getFileStatus(path) - SparkHadoopUtil.get.checkAccessPermission(status, FsAction.READ) should be (true) - logFile2.setReadable(false, false) - val status1 = fs.getFileStatus(path) - SparkHadoopUtil.get.checkAccessPermission(status1, FsAction.READ) should be (false) - - logFile2.setReadable(false, true) - val status2 = fs.getFileStatus(path) - SparkHadoopUtil.get.checkAccessPermission(status2, FsAction.READ) should be (false) - updateAndCheck(provider) { list => list.size should be (1) From ab5117b60d666a7781fc2f847b1d6e0060b4cc9a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 19 Apr 2017 09:25:21 +0800 Subject: [PATCH 6/7] Using test UGI to mimic current user and groups Change-Id: Iafc17472e44a511402a3faa5e1889fa445b3c386 --- .../spark/deploy/SparkHadoopUtilSuite.scala | 84 ++++++++++++------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala index da03de3f2d06..91114b7c7066 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.deploy +import java.security.PrivilegedExceptionAction + +import scala.util.Random + import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.hadoop.security.UserGroupInformation @@ -30,37 +34,55 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { val user = UserGroupInformation.getCurrentUser.getShortUserName val groups = UserGroupInformation.getCurrentUser.getGroupNames require(!groups.isEmpty) - val sparkHadoopUtil = new SparkHadoopUtil - - // If file is owned by user and user has access permission - var status = fileStatus(user, groups.head, READ_WRITE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be (true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (true) - - // If file is owned by user but user has no access permission - status = fileStatus(user, groups.head, NONE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be (false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (false) - - // If file is owned by user's group and user's group has access permission - status = fileStatus("test", groups.head, NONE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be (true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (true) - - // If file is owned by user's group but user's group has no access permission - status = fileStatus("test", groups.head, READ_WRITE, NONE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be (false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (false) - - // If file is owned by other user and this user has access permission - status = fileStatus("test", "test", READ_WRITE, READ_WRITE, READ_WRITE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be (true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (true) - - // If file is owned by other user but this user has no access permission - status = fileStatus("test", "test", READ_WRITE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be (false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be (false) + + val testUser = user + "-" + Random.nextInt(100) + val testGroups = groups.map { g => g + "-" + Random.nextInt(100) } + val testUgi = UserGroupInformation.createUserForTesting(testUser, testGroups) + + testUgi.doAs(new PrivilegedExceptionAction[Void] { + override def run(): Void = { + val sparkHadoopUtil = new SparkHadoopUtil + + // If file is owned by user and user has access permission + var status = fileStatus(testUser, testGroups.head, READ_WRITE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) + + // If file is owned by user but user has no access permission + status = fileStatus(testUser, testGroups.head, NONE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) + + var otherUser = "test" + var otherGroup = "test" + while (otherUser == testUser || testGroups.contains(otherGroup)) { + otherUser = s"test-${Random.nextInt(100)}" + otherGroup = s"test-${Random.nextInt(100)}" + } + + // If file is owned by user's group and user's group has access permission + status = fileStatus(otherUser, testGroups.head, NONE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) + + // If file is owned by user's group but user's group has no access permission + status = fileStatus(otherUser, testGroups.head, READ_WRITE, NONE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) + + // If file is owned by other user and this user has access permission + status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, READ_WRITE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) + + // If file is owned by other user but this user has no access permission + status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) + + null + } + }) } private def fileStatus( From b36fa75e4b2414d3df9bf6921cccc081147176e4 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 20 Apr 2017 11:09:01 +0800 Subject: [PATCH 7/7] Address the comments Change-Id: Ic9851b569da3458895e7c7ea8a5474941c466585 --- .../spark/deploy/SparkHadoopUtilSuite.scala | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala index 91114b7c7066..ab24a76e20a3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -31,12 +31,8 @@ import org.apache.spark.SparkFunSuite class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { test("check file permission") { import FsAction._ - val user = UserGroupInformation.getCurrentUser.getShortUserName - val groups = UserGroupInformation.getCurrentUser.getGroupNames - require(!groups.isEmpty) - - val testUser = user + "-" + Random.nextInt(100) - val testGroups = groups.map { g => g + "-" + Random.nextInt(100) } + val testUser = s"user-${Random.nextInt(100)}" + val testGroups = Array(s"group-${Random.nextInt(100)}") val testUgi = UserGroupInformation.createUserForTesting(testUser, testGroups) testUgi.doAs(new PrivilegedExceptionAction[Void] { @@ -53,12 +49,8 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) - var otherUser = "test" - var otherGroup = "test" - while (otherUser == testUser || testGroups.contains(otherGroup)) { - otherUser = s"test-${Random.nextInt(100)}" - otherGroup = s"test-${Random.nextInt(100)}" - } + val otherUser = s"test-${Random.nextInt(100)}" + val otherGroup = s"test-${Random.nextInt(100)}" // If file is owned by user's group and user's group has access permission status = fileStatus(otherUser, testGroups.head, NONE, READ_WRITE, NONE)