Skip to content

Commit 6feaaa4

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-31365
2 parents 9659699 + 5ba467c commit 6feaaa4

File tree

275 files changed

+9079
-3777
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

275 files changed

+9079
-3777
lines changed

R/create-rd.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ pushd "$FWDIR" > /dev/null
3434
. "$FWDIR/find-r.sh"
3535

3636
# Generate Rd files if devtools is installed
37-
"$R_SCRIPT_PATH/Rscript" -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'
37+
"$R_SCRIPT_PATH/Rscript" -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); setwd("'$FWDIR'"); devtools::document(pkg="./pkg", roclets=c("rd")) }'

R/pkg/R/DataFrame.R

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,9 +1657,7 @@ setMethod("dapplyCollect",
16571657
#'
16581658
#' @param cols grouping columns.
16591659
#' @param func a function to be applied to each group partition specified by grouping
1660-
#' column of the SparkDataFrame. The function \code{func} takes as argument
1661-
#' a key - grouping columns and a data frame - a local R data.frame.
1662-
#' The output of \code{func} is a local R data.frame.
1660+
#' column of the SparkDataFrame. See Details.
16631661
#' @param schema the schema of the resulting SparkDataFrame after the function is applied.
16641662
#' The schema must match to output of \code{func}. It has to be defined for each
16651663
#' output column with preferred output column name and corresponding data type.
@@ -1669,29 +1667,43 @@ setMethod("dapplyCollect",
16691667
#' @aliases gapply,SparkDataFrame-method
16701668
#' @rdname gapply
16711669
#' @name gapply
1670+
#' @details
1671+
#' \code{func} is a function of two arguments. The first, usually named \code{key}
1672+
#' (though this is not enforced) corresponds to the grouping key, will be an
1673+
#' unnamed \code{list} of \code{length(cols)} length-one objects corresponding
1674+
#' to the grouping columns' values for the current group.
1675+
#'
1676+
#' The second, herein \code{x}, will be a local \code{\link{data.frame}} with the
1677+
#' columns of the input not in \code{cols} for the rows corresponding to \code{key}.
1678+
#'
1679+
#' The output of \code{func} must be a \code{data.frame} matching \code{schema} --
1680+
#' in particular this means the names of the output \code{data.frame} are irrelevant
1681+
#'
16721682
#' @seealso \link{gapplyCollect}
16731683
#' @examples
16741684
#'
16751685
#' \dontrun{
1676-
#' Computes the arithmetic mean of the second column by grouping
1677-
#' on the first and third columns. Output the grouping values and the average.
1686+
#' # Computes the arithmetic mean of the second column by grouping
1687+
#' # on the first and third columns. Output the grouping values and the average.
16781688
#'
16791689
#' df <- createDataFrame (
16801690
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
16811691
#' c("a", "b", "c", "d"))
16821692
#'
1683-
#' Here our output contains three columns, the key which is a combination of two
1684-
#' columns with data types integer and string and the mean which is a double.
1693+
#' # Here our output contains three columns, the key which is a combination of two
1694+
#' # columns with data types integer and string and the mean which is a double.
16851695
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
16861696
#' structField("avg", "double"))
16871697
#' result <- gapply(
16881698
#' df,
16891699
#' c("a", "c"),
16901700
#' function(key, x) {
1701+
#' # key will either be list(1L, '1') (for the group where a=1L,c='1') or
1702+
#' # list(3L, '3') (for the group where a=3L,c='3')
16911703
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
16921704
#' }, schema)
16931705
#'
1694-
#' The schema also can be specified in a DDL-formatted string.
1706+
#' # The schema also can be specified in a DDL-formatted string.
16951707
#' schema <- "a INT, c STRING, avg DOUBLE"
16961708
#' result <- gapply(
16971709
#' df,
@@ -1700,8 +1712,8 @@ setMethod("dapplyCollect",
17001712
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
17011713
#' }, schema)
17021714
#'
1703-
#' We can also group the data and afterwards call gapply on GroupedData.
1704-
#' For Example:
1715+
#' # We can also group the data and afterwards call gapply on GroupedData.
1716+
#' # For example:
17051717
#' gdf <- group_by(df, "a", "c")
17061718
#' result <- gapply(
17071719
#' gdf,
@@ -1710,15 +1722,15 @@ setMethod("dapplyCollect",
17101722
#' }, schema)
17111723
#' collect(result)
17121724
#'
1713-
#' Result
1714-
#' ------
1715-
#' a c avg
1716-
#' 3 3 3.0
1717-
#' 1 1 1.5
1725+
#' # Result
1726+
#' # ------
1727+
#' # a c avg
1728+
#' # 3 3 3.0
1729+
#' # 1 1 1.5
17181730
#'
1719-
#' Fits linear models on iris dataset by grouping on the 'Species' column and
1720-
#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1721-
#' and 'Petal_Width' as training features.
1731+
#' # Fits linear models on iris dataset by grouping on the 'Species' column and
1732+
#' # using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1733+
#' # and 'Petal_Width' as training features.
17221734
#'
17231735
#' df <- createDataFrame (iris)
17241736
#' schema <- structType(structField("(Intercept)", "double"),
@@ -1734,12 +1746,12 @@ setMethod("dapplyCollect",
17341746
#' }, schema)
17351747
#' collect(df1)
17361748
#'
1737-
#' Result
1738-
#' ---------
1739-
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
1740-
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
1741-
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
1742-
#' 3 2.351890 0.6548350 0.2375602 0.2521257
1749+
#' # Result
1750+
#' # ---------
1751+
#' # Model (Intercept) Sepal_Width Petal_Length Petal_Width
1752+
#' # 1 0.699883 0.3303370 0.9455356 -0.1697527
1753+
#' # 2 1.895540 0.3868576 0.9083370 -0.6792238
1754+
#' # 3 2.351890 0.6548350 0.2375602 0.2521257
17431755
#'
17441756
#'}
17451757
#' @note gapply(SparkDataFrame) since 2.0.0
@@ -1757,20 +1769,30 @@ setMethod("gapply",
17571769
#'
17581770
#' @param cols grouping columns.
17591771
#' @param func a function to be applied to each group partition specified by grouping
1760-
#' column of the SparkDataFrame. The function \code{func} takes as argument
1761-
#' a key - grouping columns and a data frame - a local R data.frame.
1762-
#' The output of \code{func} is a local R data.frame.
1772+
#' column of the SparkDataFrame. See Details.
17631773
#' @return A data.frame.
17641774
#' @family SparkDataFrame functions
17651775
#' @aliases gapplyCollect,SparkDataFrame-method
17661776
#' @rdname gapplyCollect
17671777
#' @name gapplyCollect
1778+
#' @details
1779+
#' \code{func} is a function of two arguments. The first, usually named \code{key}
1780+
#' (though this is not enforced) corresponds to the grouping key, will be an
1781+
#' unnamed \code{list} of \code{length(cols)} length-one objects corresponding
1782+
#' to the grouping columns' values for the current group.
1783+
#'
1784+
#' The second, herein \code{x}, will be a local \code{\link{data.frame}} with the
1785+
#' columns of the input not in \code{cols} for the rows corresponding to \code{key}.
1786+
#'
1787+
#' The output of \code{func} must be a \code{data.frame} matching \code{schema} --
1788+
#' in particular this means the names of the output \code{data.frame} are irrelevant
1789+
#'
17681790
#' @seealso \link{gapply}
17691791
#' @examples
17701792
#'
17711793
#' \dontrun{
1772-
#' Computes the arithmetic mean of the second column by grouping
1773-
#' on the first and third columns. Output the grouping values and the average.
1794+
#' # Computes the arithmetic mean of the second column by grouping
1795+
#' # on the first and third columns. Output the grouping values and the average.
17741796
#'
17751797
#' df <- createDataFrame (
17761798
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
@@ -1785,8 +1807,8 @@ setMethod("gapply",
17851807
#' y
17861808
#' })
17871809
#'
1788-
#' We can also group the data and afterwards call gapply on GroupedData.
1789-
#' For Example:
1810+
#' # We can also group the data and afterwards call gapply on GroupedData.
1811+
#' # For example:
17901812
#' gdf <- group_by(df, "a", "c")
17911813
#' result <- gapplyCollect(
17921814
#' gdf,
@@ -1796,15 +1818,15 @@ setMethod("gapply",
17961818
#' y
17971819
#' })
17981820
#'
1799-
#' Result
1800-
#' ------
1801-
#' key_a key_c mean_b
1802-
#' 3 3 3.0
1803-
#' 1 1 1.5
1821+
#' # Result
1822+
#' # ------
1823+
#' # key_a key_c mean_b
1824+
#' # 3 3 3.0
1825+
#' # 1 1 1.5
18041826
#'
1805-
#' Fits linear models on iris dataset by grouping on the 'Species' column and
1806-
#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1807-
#' and 'Petal_Width' as training features.
1827+
#' # Fits linear models on iris dataset by grouping on the 'Species' column and
1828+
#' # using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1829+
#' # and 'Petal_Width' as training features.
18081830
#'
18091831
#' df <- createDataFrame (iris)
18101832
#' result <- gapplyCollect(
@@ -1816,12 +1838,12 @@ setMethod("gapply",
18161838
#' data.frame(t(coef(m)))
18171839
#' })
18181840
#'
1819-
#' Result
1820-
#'---------
1821-
#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width
1822-
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
1823-
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
1824-
#' 3 2.351890 0.6548350 0.2375602 0.2521257
1841+
#' # Result
1842+
#' # ---------
1843+
#' # Model X.Intercept. Sepal_Width Petal_Length Petal_Width
1844+
#' # 1 0.699883 0.3303370 0.9455356 -0.1697527
1845+
#' # 2 1.895540 0.3868576 0.9083370 -0.6792238
1846+
#' # 3 2.351890 0.6548350 0.2375602 0.2521257
18251847
#'
18261848
#'}
18271849
#' @note gapplyCollect(SparkDataFrame) since 2.0.0

R/pkg/R/functions.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3951,7 +3951,6 @@ setMethod("map_values",
39513951
#' @rdname column_collection_functions
39523952
#' @aliases map_zip_with map_zip_with,characterOrColumn,characterOrColumn,function-method
39533953
#'
3954-
#' @examples
39553954
#' @note map_zip_with since 3.1.0
39563955
setMethod("map_zip_with",
39573956
signature(x = "characterOrColumn", y = "characterOrColumn", f = "function"),

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -406,17 +406,10 @@ private void handleFailedDelete() {
406406
*
407407
* For efficiency, all calls to `next()` will return the same {@link Location} object.
408408
*
409-
* If any other lookups or operations are performed on this map while iterating over it, including
410-
* `lookup()`, the behavior of the returned iterator is undefined.
409+
* The returned iterator is thread-safe. However if the map is modified while iterating over it,
410+
* the behavior of the returned iterator is undefined.
411411
*/
412412
public MapIterator iterator() {
413-
return new MapIterator(numValues, loc, false);
414-
}
415-
416-
/**
417-
* Returns a thread safe iterator that iterates of the entries of this map.
418-
*/
419-
public MapIterator safeIterator() {
420413
return new MapIterator(numValues, new Location(), false);
421414
}
422415

@@ -427,19 +420,20 @@ public MapIterator safeIterator() {
427420
*
428421
* For efficiency, all calls to `next()` will return the same {@link Location} object.
429422
*
430-
* If any other lookups or operations are performed on this map while iterating over it, including
431-
* `lookup()`, the behavior of the returned iterator is undefined.
423+
* The returned iterator is thread-safe. However if the map is modified while iterating over it,
424+
* the behavior of the returned iterator is undefined.
432425
*/
433426
public MapIterator destructiveIterator() {
434427
updatePeakMemoryUsed();
435-
return new MapIterator(numValues, loc, true);
428+
return new MapIterator(numValues, new Location(), true);
436429
}
437430

438431
/**
439432
* Looks up a key, and return a {@link Location} handle that can be used to test existence
440433
* and read/write values.
441434
*
442-
* This function always return the same {@link Location} instance to avoid object allocation.
435+
* This function always returns the same {@link Location} instance to avoid object allocation.
436+
* This function is not thread-safe.
443437
*/
444438
public Location lookup(Object keyBase, long keyOffset, int keyLength) {
445439
safeLookup(keyBase, keyOffset, keyLength, loc,
@@ -451,7 +445,8 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength) {
451445
* Looks up a key, and return a {@link Location} handle that can be used to test existence
452446
* and read/write values.
453447
*
454-
* This function always return the same {@link Location} instance to avoid object allocation.
448+
* This function always returns the same {@link Location} instance to avoid object allocation.
449+
* This function is not thread-safe.
455450
*/
456451
public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) {
457452
safeLookup(keyBase, keyOffset, keyLength, loc, hash);

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#dag-viz-graph .label {
2323
font-weight: normal;
2424
text-shadow: none;
25+
color: #333;
2526
}
2627

2728
#dag-viz-graph svg path {

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ function renderDagViz(forJob) {
173173
});
174174

175175
metadataContainer().selectAll(".barrier-rdd").each(function() {
176-
var rddId = d3.select(this).text().trim()
177-
var clusterId = VizConstants.clusterPrefix + rddId
176+
var rddId = d3.select(this).text().trim();
177+
var clusterId = VizConstants.clusterPrefix + rddId;
178178
svg.selectAll("g." + clusterId).classed("barrier", true)
179179
});
180180

@@ -282,11 +282,7 @@ function renderDagVizForJob(svgContainer) {
282282

283283
/* Render the dot file as an SVG in the given container. */
284284
function renderDot(dot, container, forJob) {
285-
var escaped_dot = dot
286-
.replace(/&lt;/g, "<")
287-
.replace(/&gt;/g, ">")
288-
.replace(/&quot;/g, "\"");
289-
var g = graphlibDot.read(escaped_dot);
285+
var g = graphlibDot.read(dot);
290286
var renderer = new dagreD3.render();
291287
preprocessGraphLayout(g, forJob);
292288
renderer(container, g);
@@ -498,18 +494,11 @@ function connectRDDs(fromRDDId, toRDDId, edgesContainer, svgContainer) {
498494
edgesContainer.append("path").datum(points).attr("d", line);
499495
}
500496

501-
/*
502-
* Replace `/n` with `<br/>`
503-
*/
504-
function replaceLineBreak(str) {
505-
return str.replace("\\n", "<br/>");
506-
}
507-
508497
/* (Job page only) Helper function to add tooltips for RDDs. */
509498
function addTooltipsForRDDs(svgContainer) {
510499
svgContainer.selectAll("g.node").each(function() {
511500
var node = d3.select(this);
512-
var tooltipText = replaceLineBreak(node.attr("name"));
501+
var tooltipText = node.attr("name");
513502
if (tooltipText) {
514503
node.select("circle")
515504
.attr("data-toggle", "tooltip")

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark
2020
import java.util.{Properties, Timer, TimerTask}
2121

2222
import scala.collection.JavaConverters._
23-
import scala.concurrent.TimeoutException
2423
import scala.concurrent.duration._
2524
import scala.language.postfixOps
25+
import scala.util.{Failure, Success => ScalaSuccess, Try}
2626

2727
import org.apache.spark.annotation.{Experimental, Since}
2828
import org.apache.spark.executor.TaskMetrics
@@ -85,28 +85,26 @@ class BarrierTaskContext private[spark] (
8585
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
8686
timeout = new RpcTimeout(365.days, "barrierTimeout"))
8787

88-
// messages which consist of all barrier tasks' messages
89-
var messages: Array[String] = null
9088
// Wait the RPC future to be completed, but every 1 second it will jump out waiting
9189
// and check whether current spark task is killed. If killed, then throw
9290
// a `TaskKilledException`, otherwise continue wait RPC until it completes.
93-
try {
94-
while (!abortableRpcFuture.toFuture.isCompleted) {
91+
92+
while (!abortableRpcFuture.future.isCompleted) {
93+
try {
9594
// wait RPC future for at most 1 second
96-
try {
97-
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
98-
} catch {
99-
case _: TimeoutException | _: InterruptedException =>
100-
// If `TimeoutException` thrown, waiting RPC future reach 1 second.
101-
// If `InterruptedException` thrown, it is possible this task is killed.
102-
// So in this two cases, we should check whether task is killed and then
103-
// throw `TaskKilledException`
104-
taskContext.killTaskIfInterrupted()
95+
Thread.sleep(1000)
96+
} catch {
97+
case _: InterruptedException => // task is killed by driver
98+
} finally {
99+
Try(taskContext.killTaskIfInterrupted()) match {
100+
case ScalaSuccess(_) => // task is still running healthily
101+
case Failure(e) => abortableRpcFuture.abort(e)
105102
}
106103
}
107-
} finally {
108-
abortableRpcFuture.abort(taskContext.getKillReason().getOrElse("Unknown reason."))
109104
}
105+
// messages which consist of all barrier tasks' messages. The future will return the
106+
// desired messages if it is completed successfully. Otherwise, exception could be thrown.
107+
val messages = abortableRpcFuture.future.value.get.get
110108

111109
barrierEpoch += 1
112110
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2740,7 +2740,7 @@ object SparkContext extends Logging {
27402740
case "local" => 1
27412741
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
27422742
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
2743-
case "yarn" =>
2743+
case "yarn" | SparkMasterRegex.KUBERNETES_REGEX(_) =>
27442744
if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
27452745
conf.getInt(DRIVER_CORES.key, 0)
27462746
} else {
@@ -2885,6 +2885,8 @@ private object SparkMasterRegex {
28852885
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
28862886
// Regular expression for connecting to Spark deploy clusters
28872887
val SPARK_REGEX = """spark://(.*)""".r
2888+
// Regular expression for connecting to kubernetes clusters
2889+
val KUBERNETES_REGEX = """k8s://(.*)""".r
28882890
}
28892891

28902892
/**

0 commit comments

Comments
 (0)