Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -367,28 +366,6 @@ class SparkHadoopUtil extends Logging {
buffer.toString
}

private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
val perm = status.getPermission
val ugi = UserGroupInformation.getCurrentUser

if (ugi.getShortUserName == status.getOwner) {
if (perm.getUserAction.implies(mode)) {
return true
}
} else if (ugi.getGroupNames.contains(status.getGroup)) {
if (perm.getGroupAction.implies(mode)) {
return true
}
} else if (perm.getOtherAction.implies(mode)) {
return true
}

logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}

def serialize(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import java.io.{File, FileNotFoundException, IOException}
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermissions
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.io.Source
import scala.util.Try
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -114,7 +114,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = new Path(logDir).getFileSystem(hadoopConf)
// Visible for testing
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand Down Expand Up @@ -161,6 +162,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()

/**
Expand Down Expand Up @@ -418,7 +438,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
!isBlacklisted(entry.getPath)
}
.filter { entry =>
try {
Expand Down Expand Up @@ -461,32 +481,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
}

val tasks = updated.map { entry =>
val tasks = updated.flatMap { entry =>
try {
replayExecutor.submit(new Runnable {
val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
})
}, Unit)
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
null
None
}
}.filter(_ != null)
}

pendingReplayTasksCount.addAndGet(tasks.size)

// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
tasks.foreach { task =>
tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
Expand Down Expand Up @@ -779,6 +804,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is that, if there happens to be a transient acl issue when initially accessing the file, we will never see it in the application list even when acl is fixed : without a SHS restart.
Wondering if the clean interval here could be fraction of CLEAN_INTERVAL_S - so that these files have a chance of making it to app list : without much of an overhead on NN.

Copy link
Contributor Author

@mgaido91 mgaido91 Aug 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is scheduled anyway every CLEAN_INTERVAL_S. So I don't think that changing the value here helps. We may define another config for the blacklisting expiration, but this seems an overkill to me. I think it is very unlikely that a user changes application permissions on this files and when he does, he can always restart the SHS. Or we can also decide to clean the blacklist every fixed X amount of time. I don't have a strong opinion on which of these options is the best honestly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread it as MAX_LOG_AGE_S ... CLEAN_INTERVAL_S should be fine here, you are right.

}

/**
Expand Down Expand Up @@ -938,13 +965,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private def deleteLog(log: Path): Unit = {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
if (isBlacklisted(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import scala.language.postfixOps
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, spy, verify}
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -818,6 +820,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-24948: blacklist files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
argThat(new ArgumentMatcher[Path]() {
override def matches(path: Any): Boolean = {
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
}
}))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
mockedProvider.cleanLogs()
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down