Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
71170e7
[SPARK-26871][SQL] File Source V2: avoid creating unnecessary FileInd…
gengliangwang Feb 15, 2019
b6c6875
[SPARK-26790][CORE] Change approach for retrieving executor logs and …
HeartSaVioR Feb 15, 2019
28ced38
[SPARK-26772][YARN] Delete ServiceCredentialProvider and make HadoopD…
gaborgsomogyi Feb 15, 2019
3d6066e
[SPARK-21094][PYTHON] Add popen_kwargs to launch_gateway
parente Feb 16, 2019
4cabab8
[SPARK-26673][FOLLOWUP][SQL] File source V2: remove duplicated broadc…
gengliangwang Feb 16, 2019
4dce45a
[SPARK-26744][SQL] Support schema validation in FileDataSourceV2 fram…
gengliangwang Feb 16, 2019
5d8a934
[SPARK-26721][ML] Avoid per-tree normalization in featureImportance f…
mgaido91 Feb 16, 2019
dcdbd06
[SPARK-26897][SQL][TEST] Update Spark 2.3.x testing from HiveExternal…
maropu Feb 17, 2019
36902e1
[SPARK-26878] QueryTest.compare() does not handle maps with array key…
ala Feb 18, 2019
e2b8cc6
[SPARK-26897][SQL][TEST][FOLLOW-UP] Remove workaround for 2.2.0 and 2…
maropu Feb 18, 2019
4a4e7ae
[SPARK-26887][SQL][PYTHON][NS] Create datetime.date directly instead …
ueshin Feb 18, 2019
60caa92
[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overw…
rdblue Feb 18, 2019
7f53116
[SPARK-24570][SQL] Implement Spark own GetTablesOperation to fix SQL …
wangyum Feb 18, 2019
8290e5e
[SPARK-26353][SQL] Add typed aggregate functions(max/min) to the exam…
10110346 Feb 18, 2019
59eb34b
[SPARK-26889][SS][DOCS] Fix timestamp type in Structured Streaming + …
gaborgsomogyi Feb 18, 2019
a0e81fc
[SPARK-26744][SPARK-26744][SQL][HOTFOX] Disable schema validation tes…
HyukjinKwon Feb 18, 2019
f85ed9a
[SPARK-26785][SQL] data source v2 API refactor: streaming write
cloud-fan Feb 19, 2019
865c88f
[MINOR][DOC] Add note regarding proper usage of QueryExecution.toRdd
HeartSaVioR Feb 19, 2019
743b73d
[SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value i…
yucai Feb 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import scala.reflect.runtime.universe
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.util.Utils

private[security] class HBaseDelegationTokenProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf
Expand All @@ -35,6 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.{ThreadUtils, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.security.HadoopDelegationTokenProvider

private[deploy] class HadoopFSDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,33 +181,47 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend extends Logging {

private def run(
case class Arguments(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
userClassPath: mutable.ListBuffer[URL])

def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
}

def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = {

Utils.initDaemon(log)

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)
Utils.checkHost(arguments.hostname)

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
fetcher.shutdown()

// Create SparkEnv using properties we fetched from the driver.
Expand All @@ -225,19 +239,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname,
arguments.cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
}
}

def main(args: Array[String]) {
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
Expand Down Expand Up @@ -276,24 +289,24 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
printUsageAndExit(classNameForEntry)
}
}

if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
printUsageAndExit(classNameForEntry)
}

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
userClassPath)
}

private def printUsageAndExit() = {
private def printUsageAndExit(classNameForEntry: String): Unit = {
// scalastyle:off println
System.err.println(
"""
|Usage: CoarseGrainedExecutorBackend [options]
s"""
|Usage: $classNameForEntry [options]
|
| Options are:
| --driver-url <driverUrl>
Expand All @@ -307,5 +320,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// scalastyle:on println
System.exit(1)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
* limitations under the License.
*/

package org.apache.spark.deploy.security
package org.apache.spark.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi

/**
* ::DeveloperApi::
* Hadoop delegation token provider.
*/
private[spark] trait HadoopDelegationTokenProvider {
@DeveloperApi
trait HadoopDelegationTokenProvider {

/**
* Name of the service to provide delegation tokens. This name should be unique. Spark will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.security.HadoopDelegationTokenProvider

private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
ExceptionThrowingDelegationTokenProvider.constructed = true
Expand Down
25 changes: 18 additions & 7 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,18 @@ To use a custom metrics.properties for the application master and executors, upd
<td>{{HTTP_SCHEME}}</td>
<td>`http://` or `https://` according to YARN HTTP policy. (Configured via `yarn.http.policy`)</td>
</tr>
<tr>
<td>{{NM_HOST}}</td>
<td>The "host" of node where container was run.</td>
</tr>
<tr>
<td>{{NM_PORT}}</td>
<td>The "port" of node manager where container was run.</td>
</tr>
<tr>
<td>{{NM_HTTP_PORT}}</td>
<td>The "port" of node manager's http server where container was run.</td>
</tr>
<tr>
<td>{{NM_HTTP_ADDRESS}}</td>
<td>Http URI of the node on which the container is allocated.</td>
Expand All @@ -502,6 +514,12 @@ To use a custom metrics.properties for the application master and executors, upd
</tr>
</table>

For example, suppose you would like to point log url link to Job History Server directly instead of let NodeManager http server redirects it, you can configure `spark.history.custom.executor.log.url` as below:

`{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096`

NOTE: you need to replace `<JHS_POST>` and `<JHS_PORT>` with actual value.

# Important notes

- Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
Expand All @@ -520,13 +538,6 @@ for:
filesystem if `spark.yarn.stagingDir` is not set);
- if Hadoop federation is enabled, all the federated filesystems in the configuration.

The YARN integration also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark
by listing their names in the corresponding file in the jar's `META-INF/services` directory. These
providers can be disabled individually by setting `spark.security.credentials.{service}.enabled` to
`false`, where `{service}` is the name of the credential provider.

## YARN-specific Kerberos Configuration

<table class="table">
Expand Down
5 changes: 5 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,11 @@ If an application needs to interact with other secure Hadoop filesystems, their
explicitly provided to Spark at launch time. This is done by listing them in the
`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below.

Spark also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
`org.apache.spark.security.HadoopDelegationTokenProvider` can be made available to Spark
by listing their names in the corresponding file in the jar's `META-INF/services` directory.

Delegation token support is currently only supported in YARN and Mesos modes. Consult the
deployment-specific page for more information.

Expand Down
2 changes: 1 addition & 1 deletion docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ Each row in the source has the following schema:
</tr>
<tr>
<td>timestamp</td>
<td>long</td>
<td>timestamp</td>
</tr>
<tr>
<td>timestampType</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ object SimpleTypedAggregator {
println("running typed average:")
ds.groupByKey(_._1).agg(new TypedAverage[(Long, Long)](_._2.toDouble).toColumn).show()

println("running typed minimum:")
ds.groupByKey(_._1).agg(new TypedMin[(Long, Long)](_._2.toDouble).toColumn).show()

println("running typed maximum:")
ds.groupByKey(_._1).agg(new TypedMax[(Long, Long)](_._2).toColumn).show()

spark.stop()
}
}
Expand Down Expand Up @@ -84,3 +90,71 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
}
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

class TypedMin[IN](val f: IN => Double) extends Aggregator[IN, MutableDouble, Option[Double]] {
override def zero: MutableDouble = null
override def reduce(b: MutableDouble, a: IN): MutableDouble = {
if (b == null) {
new MutableDouble(f(a))
} else {
b.value = math.min(b.value, f(a))
b
}
}
override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
if (b1 == null) {
b2
} else if (b2 == null) {
b1
} else {
b1.value = math.min(b1.value, b2.value)
b1
}
}
override def finish(reduction: MutableDouble): Option[Double] = {
if (reduction != null) {
Some(reduction.value)
} else {
None
}
}

override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble]
override def outputEncoder: Encoder[Option[Double]] = Encoders.product[Option[Double]]
}

class TypedMax[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, Option[Long]] {
override def zero: MutableLong = null
override def reduce(b: MutableLong, a: IN): MutableLong = {
if (b == null) {
new MutableLong(f(a))
} else {
b.value = math.max(b.value, f(a))
b
}
}
override def merge(b1: MutableLong, b2: MutableLong): MutableLong = {
if (b1 == null) {
b2
} else if (b2 == null) {
b1
} else {
b1.value = math.max(b1.value, b2.value)
b1
}
}
override def finish(reduction: MutableLong): Option[Long] = {
if (reduction != null) {
Some(reduction.value)
} else {
None
}
}

override def bufferEncoder: Encoder[MutableLong] = Encoders.kryo[MutableLong]
override def outputEncoder: Encoder[Option[Long]] = Encoders.product[Option[Long]]
}

class MutableLong(var value: Long) extends Serializable

class MutableDouble(var value: Double) extends Serializable
Loading