From 1052c1765e075deaf76f0fedfe805808ce669b5b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 27 Jul 2018 18:01:15 +0200 Subject: [PATCH 1/9] [SPARK-24948][SHS] Delegate check access permissions to the file system --- .../apache/spark/deploy/SparkHadoopUtil.scala | 5 +++ .../spark/deploy/SparkHadoopUtilSuite.scala | 37 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8353e64a619cf..0bac664a7b657 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.collection.mutable import scala.collection.mutable.HashMap +import scala.util.Try import scala.util.control.NonFatal import com.google.common.primitives.Longs @@ -383,6 +384,10 @@ class SparkHadoopUtil extends Logging { return true } + // We may still be able to access the file as ACL may be enabled or spark may be an admin user + if (Try(status.getPath.getFileSystem(conf).access(status.getPath, mode)).isSuccess) { + return true + } logDebug(s"Permission denied: user=${ugi.getShortUserName}, " + s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" + s"${if (status.isDirectory) "d" else "-"}$perm") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala index ab24a76e20a30..03dc0709a806c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -21,9 +21,12 @@ import java.security.PrivilegedExceptionAction import scala.util.Random -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} +import org.mockito +import org.mockito.internal.stubbing.answers.DoesNothing +import org.mockito.Mockito._ import org.scalatest.Matchers import org.apache.spark.SparkFunSuite @@ -72,6 +75,14 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) + status = mockedStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE, true) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) + + status = mockedStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE, false) + sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) + sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) + null } }) @@ -94,4 +105,26 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { group, null) } + + private def mockedStatus( + owner: String, + group: String, + userAction: FsAction, + groupAction: FsAction, + otherAction: FsAction, + accessGranted: Boolean): FileStatus = { + val mockedFs = mock(classOf[FileSystem]) + val stub = when(mockedFs.access(mockito.Matchers.any(), mockito.Matchers.any())) + if (accessGranted) { + stub.thenAnswer(new DoesNothing) + } else { + stub.thenThrow(new AccessControlException) + } + val mockedPath = mock(classOf[Path]) + when(mockedPath.getFileSystem(mockito.Matchers.any())).thenReturn(mockedFs) + // This status has no access permission, here we modify only the Path it returns + val mockedStatus = spy(fileStatus(owner, group, userAction, groupAction, otherAction)) + when(mockedStatus.getPath).thenReturn(mockedPath) + mockedStatus + } } From ef42a93e3a7f298d7e0e35a98d244762c659dcab Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 27 Jul 2018 18:30:02 +0200 Subject: [PATCH 2/9] fix scalastyle --- .../scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala index 03dc0709a806c..be288c208197f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} import org.mockito -import org.mockito.internal.stubbing.answers.DoesNothing import org.mockito.Mockito._ +import org.mockito.internal.stubbing.answers.DoesNothing import org.scalatest.Matchers import org.apache.spark.SparkFunSuite From bf3f4dc051a0d9fcd5fb4af7baa61ba4bf807792 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 28 Jul 2018 17:36:27 +0200 Subject: [PATCH 3/9] use cache layer in order to avoid to access the remote file system every time --- .../apache/spark/deploy/SparkHadoopUtil.scala | 26 ---- .../deploy/history/FsHistoryProvider.scala | 44 +++++- .../spark/deploy/SparkHadoopUtilSuite.scala | 130 ------------------ .../history/FsHistoryProviderSuite.scala | 29 +++- 4 files changed, 65 insertions(+), 164 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 0bac664a7b657..03d8839f2c211 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -368,32 +368,6 @@ class SparkHadoopUtil extends Logging { buffer.toString } - private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = { - val perm = status.getPermission - val ugi = UserGroupInformation.getCurrentUser - - if (ugi.getShortUserName == status.getOwner) { - if (perm.getUserAction.implies(mode)) { - return true - } - } else if (ugi.getGroupNames.contains(status.getGroup)) { - if (perm.getGroupAction.implies(mode)) { - return true - } - } else if (perm.getOtherAction.implies(mode)) { - return true - } - - // We may still be able to access the file as ACL may be enabled or spark may be an admin user - if (Try(status.getPath.getFileSystem(conf).access(status.getPath, mode)).isSuccess) { - return true - } - logDebug(s"Permission denied: user=${ugi.getShortUserName}, " + - s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" + - s"${if (status.isDirectory) "d" else "-"}$perm") - false - } - def serialize(creds: Credentials): Array[Byte] = { val byteStream = new ByteArrayOutputStream val dataStream = new DataOutputStream(byteStream) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bf1eeb0c1bf59..3ff98f0ac9ba0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -27,17 +27,17 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.io.Source -import scala.util.Try +import scala.util.{Failure, Success, Try} import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants -import org.apache.hadoop.security.AccessControlException +import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} import org.fusesource.leveldbjni.internal.NativeDB import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -81,7 +81,7 @@ import org.apache.spark.util.kvstore._ * maintains this invariant. */ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) - extends ApplicationHistoryProvider with Logging { + extends ApplicationHistoryProvider with CachedFileSystemHelper with Logging { def this(conf: SparkConf) = { this(conf, new SystemClock()) @@ -114,7 +114,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString) private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - private val fs = new Path(logDir).getFileSystem(hadoopConf) + protected val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) // Used by check event thread and clean log thread. // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs @@ -418,7 +418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) + checkAccessPermission(entry.getPath, FsAction.READ) } .filter { entry => try { @@ -973,6 +973,38 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } +private[history] trait CachedFileSystemHelper extends Logging { + protected def fs: FileSystem + + /** + * Cache containing the result for the already checked files. + */ + // Visible for testing. + private[history] val cache = new mutable.HashMap[String, Boolean] + + private val userName = UserGroupInformation.getCurrentUser.getShortUserName + + private[history] def checkAccessPermission(path: Path, mode: FsAction): Boolean = { + cache.getOrElse(path.getName, doCheckAccessPermission(path, mode)) + } + + private def doCheckAccessPermission(path: Path, mode: FsAction): Boolean = { + Try(fs.access(path, mode)) match { + case Success(_) => + cache(path.getName) = true + true + case Failure(e: AccessControlException) => + logInfo(s"Permission denied for user '$userName' to access $path", e) + cache(path.getName) = false + false + case Failure(_) => + // When we are unable to check whether we can access the file we don't cache the result + // so we can retry later + false + } + } +} + private[history] case class FsHistoryProviderMetadata( version: Long, uiVersion: Long, diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala deleted file mode 100644 index be288c208197f..0000000000000 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy - -import java.security.PrivilegedExceptionAction - -import scala.util.Random - -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} -import org.mockito -import org.mockito.Mockito._ -import org.mockito.internal.stubbing.answers.DoesNothing -import org.scalatest.Matchers - -import org.apache.spark.SparkFunSuite - -class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { - test("check file permission") { - import FsAction._ - val testUser = s"user-${Random.nextInt(100)}" - val testGroups = Array(s"group-${Random.nextInt(100)}") - val testUgi = UserGroupInformation.createUserForTesting(testUser, testGroups) - - testUgi.doAs(new PrivilegedExceptionAction[Void] { - override def run(): Void = { - val sparkHadoopUtil = new SparkHadoopUtil - - // If file is owned by user and user has access permission - var status = fileStatus(testUser, testGroups.head, READ_WRITE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) - - // If file is owned by user but user has no access permission - status = fileStatus(testUser, testGroups.head, NONE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) - - val otherUser = s"test-${Random.nextInt(100)}" - val otherGroup = s"test-${Random.nextInt(100)}" - - // If file is owned by user's group and user's group has access permission - status = fileStatus(otherUser, testGroups.head, NONE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) - - // If file is owned by user's group but user's group has no access permission - status = fileStatus(otherUser, testGroups.head, READ_WRITE, NONE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) - - // If file is owned by other user and this user has access permission - status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, READ_WRITE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) - - // If file is owned by other user but this user has no access permission - status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) - - status = mockedStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE, true) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(true) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true) - - status = mockedStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE, false) - sparkHadoopUtil.checkAccessPermission(status, READ) should be(false) - sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false) - - null - } - }) - } - - private def fileStatus( - owner: String, - group: String, - userAction: FsAction, - groupAction: FsAction, - otherAction: FsAction): FileStatus = { - new FileStatus(0L, - false, - 0, - 0L, - 0L, - 0L, - new FsPermission(userAction, groupAction, otherAction), - owner, - group, - null) - } - - private def mockedStatus( - owner: String, - group: String, - userAction: FsAction, - groupAction: FsAction, - otherAction: FsAction, - accessGranted: Boolean): FileStatus = { - val mockedFs = mock(classOf[FileSystem]) - val stub = when(mockedFs.access(mockito.Matchers.any(), mockito.Matchers.any())) - if (accessGranted) { - stub.thenAnswer(new DoesNothing) - } else { - stub.thenThrow(new AccessControlException) - } - val mockedPath = mock(classOf[Path]) - when(mockedPath.getFileSystem(mockito.Matchers.any())).thenReturn(mockedFs) - // This status has no access permission, here we modify only the Path it returns - val mockedStatus = spy(fileStatus(owner, group, userAction, groupAction, otherAction)) - when(mockedStatus.getPath).thenReturn(mockedPath) - mockedStatus - } -} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 77b239489d489..875d1ea3abc8f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,11 +27,15 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ import org.mockito.Matchers.any -import org.mockito.Mockito.{mock, spy, verify} +import org.mockito.Mockito.{mock, spy, verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -818,6 +822,27 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("SPARK-24948: delegate permission check to the file system class") { + val helper = new CachedFileSystemHelper { + override protected val fs: FileSystem = mock(classOf[FileSystem]) + when(fs.access(any[Path], any[FsAction])).thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + invocation.getArgumentAt(0, classOf[Path]).getName match { + case "accessGranted" => + case "accessDenied" => throw new AccessControlException("File not found.") + case _ => throw new FileNotFoundException("File not found.") + } + } + }) + } + assert(helper.checkAccessPermission(new Path("accessGranted"), FsAction.READ)) + assert(helper.cache("accessGranted")) + assert(!helper.checkAccessPermission(new Path("accessDenied"), FsAction.READ)) + assert(!helper.cache("accessDenied")) + assert(!helper.checkAccessPermission(new Path("nonExisting"), FsAction.READ)) + assert(!helper.cache.contains("nonExisting")) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: From 480e3268f51dbd830f5ee5013efd2315f3aff3a9 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 28 Jul 2018 17:37:22 +0200 Subject: [PATCH 4/9] cleanup --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 03d8839f2c211..70a8c659bbdd3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -26,13 +26,11 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.collection.mutable import scala.collection.mutable.HashMap -import scala.util.Try import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} -import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} From 2ad52858ba9d17e26e6d7ea11658515af7a02a03 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 29 Jul 2018 00:12:59 +0200 Subject: [PATCH 5/9] address comment: use LRU cache in order to avoid OOM --- .../deploy/history/FsHistoryProvider.scala | 21 +++++++++++++------ .../history/FsHistoryProviderSuite.scala | 12 ++++++++--- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3ff98f0ac9ba0..3ebd758818fb3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -31,6 +31,7 @@ import scala.util.{Failure, Success, Try} import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore +import com.google.common.cache.CacheBuilder import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -113,6 +114,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) "; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString + "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString) + protected val expireTimeInSeconds = conf.get(MAX_LOG_AGE_S) + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) protected val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) @@ -733,7 +736,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = Utils.tryLog { - val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + val maxTime = clock.getTimeMillis() - expireTimeInSeconds * 1000 val expired = listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") @@ -779,6 +782,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } + // Ensure the cache gets rid of the expired entries. + cache.cleanUp() } /** @@ -975,27 +980,31 @@ private[history] object FsHistoryProvider { private[history] trait CachedFileSystemHelper extends Logging { protected def fs: FileSystem + protected def expireTimeInSeconds: Long /** - * Cache containing the result for the already checked files. + * LRU cache containing the result for the already checked files. */ // Visible for testing. - private[history] val cache = new mutable.HashMap[String, Boolean] + private[history] val cache = CacheBuilder.newBuilder() + .expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS) + .build[String, java.lang.Boolean]() private val userName = UserGroupInformation.getCurrentUser.getShortUserName private[history] def checkAccessPermission(path: Path, mode: FsAction): Boolean = { - cache.getOrElse(path.getName, doCheckAccessPermission(path, mode)) + Option(cache.getIfPresent(path.getName)).map(_.booleanValue()) + .getOrElse(doCheckAccessPermission(path, mode)) } private def doCheckAccessPermission(path: Path, mode: FsAction): Boolean = { Try(fs.access(path, mode)) match { case Success(_) => - cache(path.getName) = true + cache.put(path.getName, true) true case Failure(e: AccessControlException) => logInfo(s"Permission denied for user '$userName' to access $path", e) - cache(path.getName) = false + cache.put(path.getName, false) false case Failure(_) => // When we are unable to check whether we can access the file we don't cache the result diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 875d1ea3abc8f..628cecb8bc402 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -834,13 +834,19 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } }) + + override protected def expireTimeInSeconds = 5L } assert(helper.checkAccessPermission(new Path("accessGranted"), FsAction.READ)) - assert(helper.cache("accessGranted")) + assert(helper.cache.getIfPresent("accessGranted")) assert(!helper.checkAccessPermission(new Path("accessDenied"), FsAction.READ)) - assert(!helper.cache("accessDenied")) + assert(!helper.cache.getIfPresent("accessDenied")) assert(!helper.checkAccessPermission(new Path("nonExisting"), FsAction.READ)) - assert(!helper.cache.contains("nonExisting")) + assert(helper.cache.getIfPresent("nonExisting") == null) + Thread.sleep(5000) // wait for the cache entries to expire + helper.cache.cleanUp() + assert(helper.cache.getIfPresent("accessGranted") == null) + assert(helper.cache.getIfPresent("accessGranted") == null) } /** From aec9b8682ac33216743ad3d8d522398ef9922444 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 1 Aug 2018 16:37:37 +0200 Subject: [PATCH 6/9] check access when reading and use blacklist --- .../deploy/history/FsHistoryProvider.scala | 107 +++++++++--------- .../history/FsHistoryProviderSuite.scala | 67 ++++++----- 2 files changed, 92 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3ebd758818fb3..911de71cf2b54 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -21,24 +21,23 @@ import java.io.{File, FileNotFoundException, IOException} import java.nio.file.Files import java.nio.file.attribute.PosixFilePermissions import java.util.{Date, ServiceLoader} -import java.util.concurrent.{ExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.ExecutionException import scala.io.Source -import scala.util.{Failure, Success, Try} +import scala.util.Try import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore -import com.google.common.cache.CacheBuilder import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants -import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} +import org.apache.hadoop.security.AccessControlException import org.fusesource.leveldbjni.internal.NativeDB import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -81,8 +80,8 @@ import org.apache.spark.util.kvstore._ * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly * maintains this invariant. */ -private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) - extends ApplicationHistoryProvider with CachedFileSystemHelper with Logging { +private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: Clock) + extends ApplicationHistoryProvider with LogFilesBlacklisting with Logging { def this(conf: SparkConf) = { this(conf, new SystemClock()) @@ -114,10 +113,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) "; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString + "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString) - protected val expireTimeInSeconds = conf.get(MAX_LOG_AGE_S) - private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - protected val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) + // Visible for testing + private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) // Used by check event thread and clean log thread. // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs @@ -421,7 +419,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - checkAccessPermission(entry.getPath, FsAction.READ) + !isBlacklisted(entry.getPath) } .filter { entry => try { @@ -464,32 +462,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}") } - val tasks = updated.map { entry => + val tasks = updated.flatMap { entry => try { - replayExecutor.submit(new Runnable { + val task: Future[Unit] = replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true) - }) + }, Unit) + Some(task -> entry.getPath) } catch { // let the iteration over the updated entries break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable // to take any more submissions at this time case e: Exception => logError(s"Exception while submitting event log for replay", e) - null + None } - }.filter(_ != null) + } pendingReplayTasksCount.addAndGet(tasks.size) // Wait for all tasks to finish. This makes sure that checkForLogs // is not scheduled again while some tasks are already running in // the replayExecutor. - tasks.foreach { task => + tasks.foreach { case (task, path) => try { task.get() } catch { case e: InterruptedException => throw e + case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => + // We don't have read permissions on the log file + logDebug(s"Unable to read log $path", e.getCause) + blacklist(path) case e: Exception => logError("Exception while merging application listings", e) } finally { @@ -736,7 +739,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = Utils.tryLog { - val maxTime = clock.getTimeMillis() - expireTimeInSeconds * 1000 + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 val expired = listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") @@ -782,8 +785,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } - // Ensure the cache gets rid of the expired entries. - cache.cleanUp() + // Clean the blacklist from the expired entries. + clearBlacklist(CLEAN_INTERVAL_S) } /** @@ -943,13 +946,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private def deleteLog(log: Path): Unit = { - try { - fs.delete(log, true) - } catch { - case _: AccessControlException => - logInfo(s"No permission to delete $log, ignoring.") - case ioe: IOException => - logError(s"IOException in cleaning $log", ioe) + if (isBlacklisted(log)) { + logDebug(s"Skipping deleting $log as we don't have permissions on it.") + } else { + try { + fs.delete(log, true) + } catch { + case _: AccessControlException => + logInfo(s"No permission to delete $log, ignoring.") + case ioe: IOException => + logError(s"IOException in cleaning $log", ioe) + } } } @@ -978,39 +985,35 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } -private[history] trait CachedFileSystemHelper extends Logging { - protected def fs: FileSystem - protected def expireTimeInSeconds: Long +/** + * Manages a blacklist containing the files which cannot be read due to lack of access permissions. + */ +private[history] trait LogFilesBlacklisting extends Logging { + protected def clock: Clock /** - * LRU cache containing the result for the already checked files. + * Contains the name of blacklisted files and their insertion time. */ - // Visible for testing. - private[history] val cache = CacheBuilder.newBuilder() - .expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS) - .build[String, java.lang.Boolean]() + private val blacklist = new ConcurrentHashMap[String, Long] - private val userName = UserGroupInformation.getCurrentUser.getShortUserName + private[history] def isBlacklisted(path: Path): Boolean = { + blacklist.containsKey(path.getName) + } - private[history] def checkAccessPermission(path: Path, mode: FsAction): Boolean = { - Option(cache.getIfPresent(path.getName)).map(_.booleanValue()) - .getOrElse(doCheckAccessPermission(path, mode)) + private[history] def blacklist(path: Path): Unit = { + blacklist.put(path.getName, clock.getTimeMillis()) } - private def doCheckAccessPermission(path: Path, mode: FsAction): Boolean = { - Try(fs.access(path, mode)) match { - case Success(_) => - cache.put(path.getName, true) - true - case Failure(e: AccessControlException) => - logInfo(s"Permission denied for user '$userName' to access $path", e) - cache.put(path.getName, false) - false - case Failure(_) => - // When we are unable to check whether we can access the file we don't cache the result - // so we can retry later - false + /** + * Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`. + */ + protected def clearBlacklist(expireTimeInSeconds: Long): Unit = { + val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 + val expired = new mutable.ArrayBuffer[String] + blacklist.asScala.foreach { + case (path, creationTime) if creationTime < expiredThreshold => expired += path } + expired.foreach(blacklist.remove(_)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 628cecb8bc402..b4eba755eccbf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,15 +27,13 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ -import org.mockito.Matchers.any -import org.mockito.Mockito.{mock, spy, verify, when} -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer +import org.mockito.ArgumentMatcher +import org.mockito.Matchers.{any, argThat} +import org.mockito.Mockito.{doThrow, mock, spy, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -822,31 +820,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("SPARK-24948: delegate permission check to the file system class") { - val helper = new CachedFileSystemHelper { - override protected val fs: FileSystem = mock(classOf[FileSystem]) - when(fs.access(any[Path], any[FsAction])).thenAnswer(new Answer[Unit] { - override def answer(invocation: InvocationOnMock): Unit = { - invocation.getArgumentAt(0, classOf[Path]).getName match { - case "accessGranted" => - case "accessDenied" => throw new AccessControlException("File not found.") - case _ => throw new FileNotFoundException("File not found.") - } + test("SPARK-24948: blacklist files we don't have read permission on") { + val clock = new ManualClock(1533132471) + val provider = new FsHistoryProvider(createTestConf(), clock) + val accessDenied = newLogFile("accessDenied", None, inProgress = false) + writeFile(accessDenied, true, None, + SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None)) + val accessGranted = newLogFile("accessGranted", None, inProgress = false) + writeFile(accessGranted, true, None, + SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + val mockedFs = spy(provider.fs) + doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open( + argThat(new ArgumentMatcher[Path]() { + override def matches(path: Any): Boolean = { + path.asInstanceOf[Path].getName.toLowerCase == "accessdenied" } - }) - - override protected def expireTimeInSeconds = 5L - } - assert(helper.checkAccessPermission(new Path("accessGranted"), FsAction.READ)) - assert(helper.cache.getIfPresent("accessGranted")) - assert(!helper.checkAccessPermission(new Path("accessDenied"), FsAction.READ)) - assert(!helper.cache.getIfPresent("accessDenied")) - assert(!helper.checkAccessPermission(new Path("nonExisting"), FsAction.READ)) - assert(helper.cache.getIfPresent("nonExisting") == null) - Thread.sleep(5000) // wait for the cache entries to expire - helper.cache.cleanUp() - assert(helper.cache.getIfPresent("accessGranted") == null) - assert(helper.cache.getIfPresent("accessGranted") == null) + })) + val mockedProvider = spy(provider) + when(mockedProvider.fs).thenReturn(mockedFs) + updateAndCheck(mockedProvider) { list => + list.size should be(1) + } + writeFile(accessDenied, true, None, + SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + // Doing 2 times in order to check the blacklist filter too + updateAndCheck(mockedProvider) { list => + list.size should be(1) + } + val accessDeniedPath = new Path(accessDenied.getPath) + assert(mockedProvider.isBlacklisted(accessDeniedPath)) + clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d + mockedProvider.cleanLogs() + assert(!mockedProvider.isBlacklisted(accessDeniedPath)) } /** From c620fff90d20ba1b62e1277317754d5f14567f79 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 2 Aug 2018 12:28:31 +0200 Subject: [PATCH 7/9] address comment --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 911de71cf2b54..b87333c22757a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -491,7 +491,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: C throw e case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => // We don't have read permissions on the log file - logDebug(s"Unable to read log $path", e.getCause) + logWarning(s"Unable to read log $path", e.getCause) blacklist(path) case e: Exception => logError("Exception while merging application listings", e) From 0a48f9a343720cd4549e4937af64ab97d4ef1934 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 3 Aug 2018 09:15:22 +0200 Subject: [PATCH 8/9] address comment --- .../deploy/history/FsHistoryProvider.scala | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b87333c22757a..2ee1469441e14 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -80,8 +80,8 @@ import org.apache.spark.util.kvstore._ * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly * maintains this invariant. */ -private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: Clock) - extends ApplicationHistoryProvider with LogFilesBlacklisting with Logging { +private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) + extends ApplicationHistoryProvider with Logging { def this(conf: SparkConf) = { this(conf, new SystemClock()) @@ -162,6 +162,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: C new HistoryServerDiskManager(conf, path, listing, clock) } + private val blacklist = new ConcurrentHashMap[String, Long] + + // Visible for testing + private[history] def isBlacklisted(path: Path): Boolean = { + blacklist.containsKey(path.getName) + } + + private def blacklist(path: Path): Unit = { + blacklist.put(path.getName, clock.getTimeMillis()) + } + + /** + * Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`. + */ + private def clearBlacklist(expireTimeInSeconds: Long): Unit = { + val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 + val expired = new mutable.ArrayBuffer[String] + blacklist.asScala.foreach { + case (path, creationTime) if creationTime < expiredThreshold => expired += path + } + expired.foreach(blacklist.remove(_)) + } + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() /** @@ -985,38 +1008,6 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } -/** - * Manages a blacklist containing the files which cannot be read due to lack of access permissions. - */ -private[history] trait LogFilesBlacklisting extends Logging { - protected def clock: Clock - - /** - * Contains the name of blacklisted files and their insertion time. - */ - private val blacklist = new ConcurrentHashMap[String, Long] - - private[history] def isBlacklisted(path: Path): Boolean = { - blacklist.containsKey(path.getName) - } - - private[history] def blacklist(path: Path): Unit = { - blacklist.put(path.getName, clock.getTimeMillis()) - } - - /** - * Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`. - */ - protected def clearBlacklist(expireTimeInSeconds: Long): Unit = { - val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 - val expired = new mutable.ArrayBuffer[String] - blacklist.asScala.foreach { - case (path, creationTime) if creationTime < expiredThreshold => expired += path - } - expired.foreach(blacklist.remove(_)) - } -} - private[history] case class FsHistoryProviderMetadata( version: Long, uiVersion: Long, From 14ae790350b88c9ed64d63bd67cf21a22f8ffdd9 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 3 Aug 2018 13:01:13 +0200 Subject: [PATCH 9/9] address comment --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2ee1469441e14..44d23908146c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -178,11 +178,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private def clearBlacklist(expireTimeInSeconds: Long): Unit = { val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 - val expired = new mutable.ArrayBuffer[String] - blacklist.asScala.foreach { - case (path, creationTime) if creationTime < expiredThreshold => expired += path - } - expired.foreach(blacklist.remove(_)) + blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold) } private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()