Skip to content

Commit 16b7b40

Browse files
committed
[SPARK-24948][SHS] Delegate check access permissions to the file system
In `SparkHadoopUtil. checkAccessPermission`, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can). The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons. modified UTs Author: Marco Gaido <[email protected]> Closes #21895 from mgaido91/SPARK-24948.
1 parent a5624c7 commit 16b7b40

File tree

4 files changed

+78
-130
lines changed

4 files changed

+78
-130
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import scala.util.control.NonFatal
2929
import com.google.common.primitives.Longs
3030
import org.apache.hadoop.conf.Configuration
3131
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
32-
import org.apache.hadoop.fs.permission.FsAction
3332
import org.apache.hadoop.mapred.JobConf
3433
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3534
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -379,27 +378,6 @@ class SparkHadoopUtil extends Logging {
379378
buffer.toString
380379
}
381380

382-
private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
383-
val perm = status.getPermission
384-
val ugi = UserGroupInformation.getCurrentUser
385-
386-
if (ugi.getShortUserName == status.getOwner) {
387-
if (perm.getUserAction.implies(mode)) {
388-
return true
389-
}
390-
} else if (ugi.getGroupNames.contains(status.getGroup)) {
391-
if (perm.getGroupAction.implies(mode)) {
392-
return true
393-
}
394-
} else if (perm.getOtherAction.implies(mode)) {
395-
return true
396-
}
397-
398-
logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
399-
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
400-
s"${if (status.isDirectory) "d" else "-"}$perm")
401-
false
402-
}
403381
}
404382

405383
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@ package org.apache.spark.deploy.history
1919

2020
import java.io.{FileNotFoundException, IOException, OutputStream}
2121
import java.util.UUID
22-
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
22+
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit}
2323
import java.util.zip.{ZipEntry, ZipOutputStream}
2424

2525
import scala.collection.mutable
26+
import scala.concurrent.ExecutionException
27+
import scala.util.Try
2628
import scala.xml.Node
2729

2830
import com.google.common.io.ByteStreams
2931
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
30-
import org.apache.hadoop.fs.{FileStatus, Path}
32+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
3133
import org.apache.hadoop.fs.permission.FsAction
3234
import org.apache.hadoop.hdfs.DistributedFileSystem
3335
import org.apache.hadoop.hdfs.protocol.HdfsConstants
@@ -105,7 +107,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
105107
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
106108

107109
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
108-
private val fs = new Path(logDir).getFileSystem(hadoopConf)
110+
// Visible for testing
111+
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)
109112

110113
// Used by check event thread and clean log thread.
111114
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -129,6 +132,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
129132

130133
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
131134

135+
private val blacklist = new ConcurrentHashMap[String, Long]
136+
137+
// Visible for testing
138+
private[history] def isBlacklisted(path: Path): Boolean = {
139+
blacklist.containsKey(path.getName)
140+
}
141+
142+
private def blacklist(path: Path): Unit = {
143+
blacklist.put(path.getName, clock.getTimeMillis())
144+
}
145+
146+
/**
147+
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
148+
*/
149+
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
150+
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
151+
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
152+
}
153+
132154
/**
133155
* Return a runnable that performs the given operation on the event logs.
134156
* This operation is expected to be executed periodically.
@@ -326,7 +348,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
326348
// the end-user.
327349
!entry.getPath().getName().startsWith(".") &&
328350
prevFileSize < entry.getLen() &&
329-
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
351+
!isBlacklisted(entry.getPath)
330352
}
331353
.flatMap { entry => Some(entry) }
332354
.sortWith { case (entry1, entry2) =>
@@ -337,13 +359,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
337359
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
338360
}
339361

340-
var tasks = mutable.ListBuffer[Future[_]]()
362+
var tasks = mutable.ListBuffer[(Future[Unit], Path)]()
341363

342364
try {
343365
for (file <- logInfos) {
344-
tasks += replayExecutor.submit(new Runnable {
366+
val task: Future[Unit] = replayExecutor.submit(new Runnable {
345367
override def run(): Unit = mergeApplicationListing(file)
346-
})
368+
}, Unit)
369+
tasks += (task -> file.getPath)
347370
}
348371
} catch {
349372
// let the iteration over logInfos break, since an exception on
@@ -356,7 +379,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
356379

357380
pendingReplayTasksCount.addAndGet(tasks.size)
358381

359-
tasks.foreach { task =>
382+
tasks.foreach { case (task, path) =>
360383
try {
361384
// Wait for all tasks to finish. This makes sure that checkForLogs
362385
// is not scheduled again while some tasks are already running in
@@ -365,6 +388,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
365388
} catch {
366389
case e: InterruptedException =>
367390
throw e
391+
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
392+
// We don't have read permissions on the log file
393+
logWarning(s"Unable to read log $path", e.getCause)
394+
blacklist(path)
368395
case e: Exception =>
369396
logError("Exception while merging application listings", e)
370397
} finally {
@@ -587,6 +614,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
587614
} catch {
588615
case t: Exception => logError("Exception in cleaning logs", t)
589616
}
617+
// Clean the blacklist from the expired entries.
618+
clearBlacklist(CLEAN_INTERVAL_S)
590619
}
591620

592621
/**

core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala

Lines changed: 0 additions & 97 deletions
This file was deleted.

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ import scala.concurrent.duration._
2727
import scala.language.postfixOps
2828

2929
import com.google.common.io.{ByteStreams, Files}
30-
import org.apache.hadoop.fs.FileStatus
30+
import org.apache.hadoop.fs.{FileStatus, Path}
3131
import org.apache.hadoop.hdfs.DistributedFileSystem
32+
import org.apache.hadoop.security.AccessControlException
3233
import org.json4s.jackson.JsonMethods._
33-
import org.mockito.Matchers.any
34-
import org.mockito.Mockito.{mock, spy, verify}
34+
import org.mockito.ArgumentMatcher
35+
import org.mockito.Matchers.{any, argThat}
36+
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
3537
import org.scalatest.BeforeAndAfter
3638
import org.scalatest.Matchers
3739
import org.scalatest.concurrent.Eventually._
@@ -583,6 +585,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
583585
}
584586
}
585587

588+
test("SPARK-24948: blacklist files we don't have read permission on") {
589+
val clock = new ManualClock(1533132471)
590+
val provider = new FsHistoryProvider(createTestConf(), clock)
591+
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
592+
writeFile(accessDenied, true, None,
593+
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
594+
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
595+
writeFile(accessGranted, true, None,
596+
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
597+
SparkListenerApplicationEnd(5L))
598+
val mockedFs = spy(provider.fs)
599+
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
600+
argThat(new ArgumentMatcher[Path]() {
601+
override def matches(path: Any): Boolean = {
602+
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
603+
}
604+
}))
605+
val mockedProvider = spy(provider)
606+
when(mockedProvider.fs).thenReturn(mockedFs)
607+
updateAndCheck(mockedProvider) { list =>
608+
list.size should be(1)
609+
}
610+
writeFile(accessDenied, true, None,
611+
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
612+
SparkListenerApplicationEnd(5L))
613+
// Doing 2 times in order to check the blacklist filter too
614+
updateAndCheck(mockedProvider) { list =>
615+
list.size should be(1)
616+
}
617+
val accessDeniedPath = new Path(accessDenied.getPath)
618+
assert(mockedProvider.isBlacklisted(accessDeniedPath))
619+
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
620+
mockedProvider.cleanLogs()
621+
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
622+
}
623+
586624
/**
587625
* Asks the provider to check for logs and calls a function to perform checks on the updated
588626
* app list. Example:

0 commit comments

Comments
 (0)