Skip to content

Commit c8c70a9

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-22856
2 parents e530f01 + 2ce37b5 commit c8c70a9

File tree

189 files changed

+4391
-2746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

189 files changed

+4391
-2746
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ sparkSession <- if (windows_with_hadoop()) {
6767
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
6868
}
6969
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
70+
# materialize the catalog implementation
71+
listTables()
7072

7173
mockLines <- c("{\"name\":\"Michael\"}",
7274
"{\"name\":\"Andy\", \"age\":30}",

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Sys.setenv("_JAVA_OPTIONS" = paste("-XX:-UsePerfData", old_java_opt, sep = " "))
4646

4747
## Overview
4848

49-
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using [MLlib](http://spark.apache.org/mllib/).
49+
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using [MLlib](https://spark.apache.org/mllib/).
5050

5151
## Getting Started
5252

@@ -132,7 +132,7 @@ sparkR.session.stop()
132132

133133
Different from many other R packages, to use SparkR, you need an additional installation of Apache Spark. The Spark installation will be used to run a backend process that will compile and execute SparkR programs.
134134

135-
After installing the SparkR package, you can call `sparkR.session` as explained in the previous section to start and it will check for the Spark installation. If you are working with SparkR from an interactive shell (eg. R, RStudio) then Spark is downloaded and cached automatically if it is not found. Alternatively, we provide an easy-to-use function `install.spark` for running this manually. If you don't have Spark installed on the computer, you may download it from [Apache Spark Website](http://spark.apache.org/downloads.html).
135+
After installing the SparkR package, you can call `sparkR.session` as explained in the previous section to start and it will check for the Spark installation. If you are working with SparkR from an interactive shell (eg. R, RStudio) then Spark is downloaded and cached automatically if it is not found. Alternatively, we provide an easy-to-use function `install.spark` for running this manually. If you don't have Spark installed on the computer, you may download it from [Apache Spark Website](https://spark.apache.org/downloads.html).
136136

137137
```{r, eval=FALSE}
138138
install.spark()
@@ -147,7 +147,7 @@ sparkR.session(sparkHome = "/HOME/spark")
147147
### Spark Session {#SetupSparkSession}
148148

149149

150-
In addition to `sparkHome`, many other options can be specified in `sparkR.session`. For a complete list, see [Starting up: SparkSession](http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparksession) and [SparkR API doc](http://spark.apache.org/docs/latest/api/R/sparkR.session.html).
150+
In addition to `sparkHome`, many other options can be specified in `sparkR.session`. For a complete list, see [Starting up: SparkSession](https://spark.apache.org/docs/latest/sparkr.html#starting-up-sparksession) and [SparkR API doc](https://spark.apache.org/docs/latest/api/R/sparkR.session.html).
151151

152152
In particular, the following Spark driver properties can be set in `sparkConfig`.
153153

@@ -169,15 +169,15 @@ sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)
169169

170170

171171
#### Cluster Mode
172-
SparkR can connect to remote Spark clusters. [Cluster Mode Overview](http://spark.apache.org/docs/latest/cluster-overview.html) is a good introduction to different Spark cluster modes.
172+
SparkR can connect to remote Spark clusters. [Cluster Mode Overview](https://spark.apache.org/docs/latest/cluster-overview.html) is a good introduction to different Spark cluster modes.
173173

174174
When connecting SparkR to a remote Spark cluster, make sure that the Spark version and Hadoop version on the machine match the corresponding versions on the cluster. Current SparkR package is compatible with
175175
```{r, echo=FALSE, tidy = TRUE}
176176
paste("Spark", packageVersion("SparkR"))
177177
```
178178
It should be used both on the local computer and on the remote cluster.
179179

180-
To connect, pass the URL of the master node to `sparkR.session`. A complete list can be seen in [Spark Master URLs](http://spark.apache.org/docs/latest/submitting-applications.html#master-urls).
180+
To connect, pass the URL of the master node to `sparkR.session`. A complete list can be seen in [Spark Master URLs](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls).
181181
For example, to connect to a local standalone Spark master, we can call
182182

183183
```{r, eval=FALSE}
@@ -317,7 +317,7 @@ A common flow of grouping and aggregation is
317317

318318
2. Feed the `GroupedData` object to `agg` or `summarize` functions, with some provided aggregation functions to compute a number within each group.
319319

320-
A number of widely used functions are supported to aggregate data after grouping, including `avg`, `countDistinct`, `count`, `first`, `kurtosis`, `last`, `max`, `mean`, `min`, `sd`, `skewness`, `stddev_pop`, `stddev_samp`, `sumDistinct`, `sum`, `var_pop`, `var_samp`, `var`. See the [API doc for `mean`](http://spark.apache.org/docs/latest/api/R/mean.html) and other `agg_funcs` linked there.
320+
A number of widely used functions are supported to aggregate data after grouping, including `avg`, `countDistinct`, `count`, `first`, `kurtosis`, `last`, `max`, `mean`, `min`, `sd`, `skewness`, `stddev_pop`, `stddev_samp`, `sumDistinct`, `sum`, `var_pop`, `var_samp`, `var`. See the [API doc for aggregate functions](https://spark.apache.org/docs/latest/api/R/column_aggregate_functions.html) linked there.
321321

322322
For example we can compute a histogram of the number of cylinders in the `mtcars` dataset as shown below.
323323

@@ -935,7 +935,7 @@ perplexity
935935

936936
#### Alternating Least Squares
937937

938-
`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](http://dl.acm.org/citation.cfm?id=1608614).
938+
`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](https://dl.acm.org/citation.cfm?id=1608614).
939939

940940
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, and `nonnegative`. For a complete list, refer to the help file.
941941

@@ -1171,11 +1171,11 @@ env | map
11711171

11721172
## References
11731173

1174-
* [Spark Cluster Mode Overview](http://spark.apache.org/docs/latest/cluster-overview.html)
1174+
* [Spark Cluster Mode Overview](https://spark.apache.org/docs/latest/cluster-overview.html)
11751175

1176-
* [Submitting Spark Applications](http://spark.apache.org/docs/latest/submitting-applications.html)
1176+
* [Submitting Spark Applications](https://spark.apache.org/docs/latest/submitting-applications.html)
11771177

1178-
* [Machine Learning Library Guide (MLlib)](http://spark.apache.org/docs/latest/ml-guide.html)
1178+
* [Machine Learning Library Guide (MLlib)](https://spark.apache.org/docs/latest/ml-guide.html)
11791179

11801180
* [SparkR: Scaling R Programs with Spark](https://people.csail.mit.edu/matei/papers/2016/sigmod_sparkr.pdf), Shivaram Venkataraman, Zongheng Yang, Davies Liu, Eric Liang, Hossein Falaki, Xiangrui Meng, Reynold Xin, Ali Ghodsi, Michael Franklin, Ion Stoica, and Matei Zaharia. SIGMOD 2016. June 2016.
11811181

core/src/main/resources/org/apache/spark/ui/static/webui.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ $(function() {
7272
collapseTablePageLoad('collapse-aggregated-allActiveStages','aggregated-allActiveStages');
7373
collapseTablePageLoad('collapse-aggregated-allPendingStages','aggregated-allPendingStages');
7474
collapseTablePageLoad('collapse-aggregated-allCompletedStages','aggregated-allCompletedStages');
75+
collapseTablePageLoad('collapse-aggregated-allSkippedStages','aggregated-allSkippedStages');
7576
collapseTablePageLoad('collapse-aggregated-allFailedStages','aggregated-allFailedStages');
7677
collapseTablePageLoad('collapse-aggregated-activeStages','aggregated-activeStages');
7778
collapseTablePageLoad('collapse-aggregated-pendingOrSkippedStages','aggregated-pendingOrSkippedStages');

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
7676
val mapSideCombine: Boolean = false)
7777
extends Dependency[Product2[K, V]] {
7878

79+
if (mapSideCombine) {
80+
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
81+
}
7982
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
8083

8184
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
5555
/**
5656
* Request that the cluster manager kill the specified executors.
5757
*
58-
* When asking the executor to be replaced, the executor loss is considered a failure, and
59-
* killed tasks that are running on the executor will count towards the failure limits. If no
60-
* replacement is being requested, then the tasks will not count towards the limit.
61-
*
6258
* @param executorIds identifiers of executors to kill
63-
* @param replace whether to replace the killed executors with new ones, default false
59+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
60+
* after these executors have been killed
61+
* @param countFailures if there are tasks running on the executors when they are killed, whether
62+
* to count those failures toward task failure limits
6463
* @param force whether to force kill busy executors, default false
6564
* @return the ids of the executors acknowledged by the cluster manager to be removed.
6665
*/
6766
def killExecutors(
6867
executorIds: Seq[String],
69-
replace: Boolean = false,
68+
adjustTargetNumExecutors: Boolean,
69+
countFailures: Boolean,
7070
force: Boolean = false): Seq[String]
7171

7272
/**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
8181
* @return whether the request is acknowledged by the cluster manager.
8282
*/
8383
def killExecutor(executorId: String): Boolean = {
84-
val killedExecutors = killExecutors(Seq(executorId))
84+
val killedExecutors = killExecutors(Seq(executorId), adjustTargetNumExecutors = true,
85+
countFailures = false)
8586
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
8687
}
8788
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
2929
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
32+
import org.apache.spark.storage.BlockManagerMaster
3233
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
3334

3435
/**
@@ -81,7 +82,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
8182
private[spark] class ExecutorAllocationManager(
8283
client: ExecutorAllocationClient,
8384
listenerBus: LiveListenerBus,
84-
conf: SparkConf)
85+
conf: SparkConf,
86+
blockManagerMaster: BlockManagerMaster)
8587
extends Logging {
8688

8789
allocationManager =>
@@ -151,7 +153,7 @@ private[spark] class ExecutorAllocationManager(
151153
private var clock: Clock = new SystemClock()
152154

153155
// Listener for Spark events that impact the allocation policy
154-
private val listener = new ExecutorAllocationListener
156+
val listener = new ExecutorAllocationListener
155157

156158
// Executor that handles the scheduling task.
157159
private val executor =
@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(
334336

335337
// If the new target has not changed, avoid sending a message to the cluster manager
336338
if (numExecutorsTarget < oldNumExecutorsTarget) {
339+
// We lower the target number of executors but don't actively kill any yet. Killing is
340+
// controlled separately by an idle timeout. It's still helpful to reduce the target number
341+
// in case an executor just happens to get lost (eg., bad hardware, or the cluster manager
342+
// preempts it) -- in that case, there is no point in trying to immediately get a new
343+
// executor, since we wouldn't even use it yet.
337344
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
338345
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
339346
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
@@ -455,7 +462,10 @@ private[spark] class ExecutorAllocationManager(
455462
val executorsRemoved = if (testing) {
456463
executorIdsToBeRemoved
457464
} else {
458-
client.killExecutors(executorIdsToBeRemoved)
465+
// We don't want to change our target number of executors, because we already did that
466+
// when the task backlog decreased.
467+
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
468+
countFailures = false, force = false)
459469
}
460470
// [SPARK-21834] killExecutors api reduces the target number of executors.
461471
// So we need to update the target with desired value.
@@ -575,7 +585,7 @@ private[spark] class ExecutorAllocationManager(
575585
// Note that it is not necessary to query the executors since all the cached
576586
// blocks we are concerned with are reported to the driver. Note that this
577587
// does not include broadcast blocks.
578-
val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
588+
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
579589
val now = clock.getTimeMillis()
580590
val timeout = {
581591
if (hasCachedBlocks) {
@@ -610,7 +620,7 @@ private[spark] class ExecutorAllocationManager(
610620
* This class is intentionally conservative in its assumptions about the relative ordering
611621
* and consistency of events returned by the listener.
612622
*/
613-
private class ExecutorAllocationListener extends SparkListener {
623+
private[spark] class ExecutorAllocationListener extends SparkListener {
614624

615625
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
616626
// Number of running tasks per stage including speculative tasks.

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -520,19 +520,25 @@ private[spark] class SecurityManager(
520520
*
521521
* If authentication is disabled, do nothing.
522522
*
523-
* In YARN mode, generate a new secret and store it in the current user's credentials.
523+
* In YARN and local mode, generate a new secret and store it in the current user's credentials.
524524
*
525525
* In other modes, assert that the auth secret is set in the configuration.
526526
*/
527527
def initializeAuth(): Unit = {
528+
import SparkMasterRegex._
529+
528530
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
529531
return
530532
}
531533

532-
if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
533-
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
534-
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
535-
return
534+
val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "")
535+
master match {
536+
case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) =>
537+
// Secret generation allowed here
538+
case _ =>
539+
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
540+
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
541+
return
536542
}
537543

538544
val rnd = new SecureRandom()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,8 @@ class SparkContext(config: SparkConf) extends Logging {
534534
schedulerBackend match {
535535
case b: ExecutorAllocationClient =>
536536
Some(new ExecutorAllocationManager(
537-
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
537+
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
538+
_env.blockManager.master))
538539
case _ =>
539540
None
540541
}
@@ -1633,6 +1634,8 @@ class SparkContext(config: SparkConf) extends Logging {
16331634
* :: DeveloperApi ::
16341635
* Request that the cluster manager kill the specified executors.
16351636
*
1637+
* This is not supported when dynamic allocation is turned on.
1638+
*
16361639
* @note This is an indication to the cluster manager that the application wishes to adjust
16371640
* its resource usage downwards. If the application wishes to replace the executors it kills
16381641
* through this method with new ones, it should follow up explicitly with a call to
@@ -1644,7 +1647,10 @@ class SparkContext(config: SparkConf) extends Logging {
16441647
def killExecutors(executorIds: Seq[String]): Boolean = {
16451648
schedulerBackend match {
16461649
case b: ExecutorAllocationClient =>
1647-
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
1650+
require(executorAllocationManager.isEmpty,
1651+
"killExecutors() unsupported with Dynamic Allocation turned on")
1652+
b.killExecutors(executorIds, adjustTargetNumExecutors = true, countFailures = false,
1653+
force = true).nonEmpty
16481654
case _ =>
16491655
logWarning("Killing executors is not supported by current scheduler.")
16501656
false
@@ -1682,7 +1688,8 @@ class SparkContext(config: SparkConf) extends Logging {
16821688
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
16831689
schedulerBackend match {
16841690
case b: ExecutorAllocationClient =>
1685-
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
1691+
b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true,
1692+
force = true).nonEmpty
16861693
case _ =>
16871694
logWarning("Killing executors is not supported by current scheduler.")
16881695
false

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ private[scheduler] class BlacklistTracker (
152152
case Some(a) =>
153153
logInfo(s"Killing blacklisted executor id $exec " +
154154
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
155-
a.killExecutors(Seq(exec), true, true)
155+
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
156+
force = true)
156157
case None =>
157158
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
158159
s"since allocation client is not defined.")

0 commit comments

Comments
 (0)