Skip to content

Commit e0628f2

Browse files
committed
Revert "[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS"
This reverts commit 6c65da6.
1 parent adbdb19 commit e0628f2

File tree

16 files changed

+111
-635
lines changed

16 files changed

+111
-635
lines changed

core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala

Lines changed: 0 additions & 105 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{ByteArrayInputStream, DataInputStream}
2120
import java.lang.reflect.Method
2221
import java.security.PrivilegedExceptionAction
23-
import java.util.{Arrays, Comparator}
2422

25-
import com.google.common.primitives.Longs
2623
import org.apache.hadoop.conf.Configuration
27-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
24+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2825
import org.apache.hadoop.fs.FileSystem.Statistics
29-
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
3026
import org.apache.hadoop.mapred.JobConf
3127
import org.apache.hadoop.mapreduce.JobContext
3228
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -36,16 +32,14 @@ import org.apache.spark.annotation.DeveloperApi
3632
import org.apache.spark.util.Utils
3733

3834
import scala.collection.JavaConversions._
39-
import scala.concurrent.duration._
4035

4136
/**
4237
* :: DeveloperApi ::
4338
* Contains util methods to interact with Hadoop from Spark.
4439
*/
4540
@DeveloperApi
4641
class SparkHadoopUtil extends Logging {
47-
private val sparkConf = new SparkConf()
48-
val conf: Configuration = newConfiguration(sparkConf)
42+
val conf: Configuration = newConfiguration(new SparkConf())
4943
UserGroupInformation.setConfiguration(conf)
5044

5145
/**
@@ -207,61 +201,6 @@ class SparkHadoopUtil extends Logging {
207201
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
208202
}
209203

210-
/**
211-
* Lists all the files in a directory with the specified prefix, and does not end with the
212-
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
213-
* the respective files.
214-
*/
215-
def listFilesSorted(
216-
remoteFs: FileSystem,
217-
dir: Path,
218-
prefix: String,
219-
exclusionSuffix: String): Array[FileStatus] = {
220-
val fileStatuses = remoteFs.listStatus(dir,
221-
new PathFilter {
222-
override def accept(path: Path): Boolean = {
223-
val name = path.getName
224-
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
225-
}
226-
})
227-
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
228-
override def compare(o1: FileStatus, o2: FileStatus): Int = {
229-
Longs.compare(o1.getModificationTime, o2.getModificationTime)
230-
}
231-
})
232-
fileStatuses
233-
}
234-
235-
/**
236-
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
237-
* is valid the latest)?
238-
* This will return -ve (or 0) value if the fraction of validity has already expired.
239-
*/
240-
def getTimeFromNowToRenewal(
241-
sparkConf: SparkConf,
242-
fraction: Double,
243-
credentials: Credentials): Long = {
244-
val now = System.currentTimeMillis()
245-
246-
val renewalInterval =
247-
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
248-
249-
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
250-
.map { t =>
251-
val identifier = new DelegationTokenIdentifier()
252-
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
253-
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
254-
}.foldLeft(0L)(math.max)
255-
}
256-
257-
258-
private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
259-
val fileName = credentialsPath.getName
260-
fileName.substring(
261-
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
262-
}
263-
264-
265204
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
266205

267206
/**
@@ -312,10 +251,6 @@ object SparkHadoopUtil {
312251
}
313252
}
314253

315-
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
316-
317-
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
318-
319254
def get: SparkHadoopUtil = {
320255
hadoop
321256
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -401,10 +401,6 @@ object SparkSubmit {
401401
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
402402
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
403403

404-
// Yarn client or cluster
405-
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
406-
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
407-
408404
// Other options
409405
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
410406
sysProp = "spark.executor.cores"),

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
6363
var action: SparkSubmitAction = null
6464
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
6565
var proxyUser: String = null
66-
var principal: String = null
67-
var keytab: String = null
6866

6967
// Standalone cluster mode only
7068
var supervise: Boolean = false
@@ -395,12 +393,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
395393
case PROXY_USER =>
396394
proxyUser = value
397395

398-
case PRINCIPAL =>
399-
principal = value
400-
401-
case KEYTAB =>
402-
keytab = value
403-
404396
case HELP =>
405397
printUsageAndExit(0)
406398

@@ -514,13 +506,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
514506
| --num-executors NUM Number of executors to launch (Default: 2).
515507
| --archives ARCHIVES Comma separated list of archives to be extracted into the
516508
| working directory of each executor.
517-
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
518-
| secure HDFS.
519-
| --keytab KEYTAB The full path to the file that contains the keytab for the
520-
| principal specified above. This keytab will be copied to
521-
| the node running the Application Master via the Secure
522-
| Distributed Cache, for renewing the login tickets and the
523-
| delegation tokens periodically.
524509
""".stripMargin
525510
)
526511
SparkSubmit.exitFn()

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
2626
import org.apache.spark.rpc._
2727
import org.apache.spark._
2828
import org.apache.spark.TaskState.TaskState
29-
import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil}
29+
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.deploy.worker.WorkerWatcher
3131
import org.apache.spark.scheduler.TaskDescription
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -168,16 +168,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
168168
driverConf.set(key, value)
169169
}
170170
}
171-
var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
172-
if (driverConf.contains("spark.yarn.credentials.file")) {
173-
logInfo("Will periodically update credentials from: " +
174-
driverConf.get("spark.yarn.credentials.file"))
175-
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
176-
tokenUpdaterOption =
177-
Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))
178-
tokenUpdaterOption.get.updateCredentialsIfRequired()
179-
}
180-
181171
val env = SparkEnv.createExecutorEnv(
182172
driverConf, executorId, hostname, port, cores, isLocal = false)
183173

@@ -193,7 +183,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
193183
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
194184
}
195185
env.rpcEnv.awaitTermination()
196-
tokenUpdaterOption.foreach(_.stop())
197186
}
198187
}
199188

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6868

6969
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7070
extends ThreadSafeRpcEndpoint with Logging {
71-
7271
override protected def log = CoarseGrainedSchedulerBackend.this.log
7372

7473
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -113,7 +112,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
113112
// Ignoring the task kill since the executor is not registered.
114113
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
115114
}
116-
117115
}
118116

119117
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -124,6 +122,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
124122
} else {
125123
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
126124
context.reply(RegisteredExecutor)
125+
127126
addressToExecutorId(executorRef.address) = executorId
128127
totalCoreCount.addAndGet(cores)
129128
totalRegisteredExecutors.addAndGet(1)
@@ -244,7 +243,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
244243
properties += ((key, value))
245244
}
246245
}
247-
248246
// TODO (prashant) send conf instead of properties
249247
driverEndpoint = rpcEnv.setupEndpoint(
250248
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))

docs/security.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ SSL must be configured on each node and configured for each component involved i
3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
3434

35-
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
36-
3735
### Standalone mode
3836
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
3937

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,8 @@ class SparkSubmitOptionParser {
6969
// YARN-only options.
7070
protected final String ARCHIVES = "--archives";
7171
protected final String EXECUTOR_CORES = "--executor-cores";
72-
protected final String KEYTAB = "--keytab";
73-
protected final String NUM_EXECUTORS = "--num-executors";
74-
protected final String PRINCIPAL = "--principal";
7572
protected final String QUEUE = "--queue";
73+
protected final String NUM_EXECUTORS = "--num-executors";
7674

7775
/**
7876
* This is the canonical list of spark-submit options. Each entry in the array contains the
@@ -98,13 +96,11 @@ class SparkSubmitOptionParser {
9896
{ EXECUTOR_MEMORY },
9997
{ FILES },
10098
{ JARS },
101-
{ KEYTAB },
10299
{ KILL_SUBMISSION },
103100
{ MASTER },
104101
{ NAME },
105102
{ NUM_EXECUTORS },
106103
{ PACKAGES },
107-
{ PRINCIPAL },
108104
{ PROPERTIES_FILE },
109105
{ PROXY_USER },
110106
{ PY_FILES },

0 commit comments

Comments
 (0)