Skip to content

Commit 1f58cc1

Browse files
authored
Merge branch 'master' into ignore-empty-files
2 parents 7057f8b + de42281 commit 1f58cc1

File tree

217 files changed

+6655
-4092
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

217 files changed

+6655
-4092
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ exportMethods("arrange",
169169
"toJSON",
170170
"transform",
171171
"union",
172+
"unionAll",
172173
"unionByName",
173174
"unique",
174175
"unpersist",

R/pkg/R/DataFrame.R

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,6 @@ setMethod("repartition",
766766
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
767767
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
768768
#'}
769-
#'
770769
#' At least one partition-by expression must be specified.
771770
#' When no explicit sort order is specified, "ascending nulls first" is assumed.
772771
#'
@@ -828,7 +827,6 @@ setMethod("repartitionByRange",
828827
#' toJSON
829828
#'
830829
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
831-
#'
832830
#' Each row is turned into a JSON document with columns as different fields.
833831
#' The returned SparkDataFrame has a single character column with the name \code{value}
834832
#'
@@ -2732,6 +2730,20 @@ setMethod("union",
27322730
dataFrame(unioned)
27332731
})
27342732

2733+
#' Return a new SparkDataFrame containing the union of rows
2734+
#'
2735+
#' This is an alias for `union`.
2736+
#'
2737+
#' @rdname union
2738+
#' @name unionAll
2739+
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
2740+
#' @note unionAll since 1.4.0
2741+
setMethod("unionAll",
2742+
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2743+
function(x, y) {
2744+
union(x, y)
2745+
})
2746+
27352747
#' Return a new SparkDataFrame containing the union of rows, matched by column names
27362748
#'
27372749
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame

R/pkg/R/generics.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,9 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
631631
#' @rdname union
632632
setGeneric("union", function(x, y) { standardGeneric("union") })
633633

634+
#' @rdname union
635+
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
636+
634637
#' @rdname unionByName
635638
setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
636639

R/pkg/R/stats.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ setMethod("corr",
109109
#'
110110
#' Finding frequent items for columns, possibly with false positives.
111111
#' Using the frequent element count algorithm described in
112-
#' \url{http://dx.doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
112+
#' \url{https://doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
113113
#'
114114
#' @param x A SparkDataFrame.
115115
#' @param cols A vector column names to search frequent items in.
@@ -143,7 +143,7 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"),
143143
#' *exact* rank of x is close to (p * N). More precisely,
144144
#' floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
145145
#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed
146-
#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
146+
#' optimizations). The algorithm was first present in [[https://doi.org/10.1145/375663.375670
147147
#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
148148
#' Note that NA values will be ignored in numerical columns before calculation. For
149149
#' columns only containing NA values, an empty list is returned.

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,6 +2458,7 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
24582458
expect_equal(count(unioned), 6)
24592459
expect_equal(first(unioned)$name, "Michael")
24602460
expect_equal(count(arrange(suppressWarnings(union(df, df2)), df$age)), 6)
2461+
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
24612462

24622463
df1 <- select(df2, "age", "name")
24632464
unioned1 <- arrange(unionByName(df1, df), df1$age)

bin/docker-image-tool.sh

Lines changed: 85 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,20 @@ if [ -z "${SPARK_HOME}" ]; then
2929
fi
3030
. "${SPARK_HOME}/bin/load-spark-env.sh"
3131

32+
CTX_DIR="$SPARK_HOME/target/tmp/docker"
33+
34+
function is_dev_build {
35+
[ ! -f "$SPARK_HOME/RELEASE" ]
36+
}
37+
38+
function cleanup_ctx_dir {
39+
if is_dev_build; then
40+
rm -rf "$CTX_DIR"
41+
fi
42+
}
43+
44+
trap cleanup_ctx_dir EXIT
45+
3246
function image_ref {
3347
local image="$1"
3448
local add_repo="${2:-1}"
@@ -53,80 +67,114 @@ function docker_push {
5367
fi
5468
}
5569

70+
# Create a smaller build context for docker in dev builds to make the build faster. Docker
71+
# uploads all of the current directory to the daemon, and it can get pretty big with dev
72+
# builds that contain test log files and other artifacts.
73+
#
74+
# Three build contexts are created, one for each image: base, pyspark, and sparkr. For them
75+
# to have the desired effect, the docker command needs to be executed inside the appropriate
76+
# context directory.
77+
#
78+
# Note: docker does not support symlinks in the build context.
79+
function create_dev_build_context {(
80+
set -e
81+
local BASE_CTX="$CTX_DIR/base"
82+
mkdir -p "$BASE_CTX/kubernetes"
83+
cp -r "resource-managers/kubernetes/docker/src/main/dockerfiles" \
84+
"$BASE_CTX/kubernetes/dockerfiles"
85+
86+
cp -r "assembly/target/scala-$SPARK_SCALA_VERSION/jars" "$BASE_CTX/jars"
87+
cp -r "resource-managers/kubernetes/integration-tests/tests" \
88+
"$BASE_CTX/kubernetes/tests"
89+
90+
mkdir "$BASE_CTX/examples"
91+
cp -r "examples/src" "$BASE_CTX/examples/src"
92+
# Copy just needed examples jars instead of everything.
93+
mkdir "$BASE_CTX/examples/jars"
94+
for i in examples/target/scala-$SPARK_SCALA_VERSION/jars/*; do
95+
if [ ! -f "$BASE_CTX/jars/$(basename $i)" ]; then
96+
cp $i "$BASE_CTX/examples/jars"
97+
fi
98+
done
99+
100+
for other in bin sbin data; do
101+
cp -r "$other" "$BASE_CTX/$other"
102+
done
103+
104+
local PYSPARK_CTX="$CTX_DIR/pyspark"
105+
mkdir -p "$PYSPARK_CTX/kubernetes"
106+
cp -r "resource-managers/kubernetes/docker/src/main/dockerfiles" \
107+
"$PYSPARK_CTX/kubernetes/dockerfiles"
108+
mkdir "$PYSPARK_CTX/python"
109+
cp -r "python/lib" "$PYSPARK_CTX/python/lib"
110+
111+
local R_CTX="$CTX_DIR/sparkr"
112+
mkdir -p "$R_CTX/kubernetes"
113+
cp -r "resource-managers/kubernetes/docker/src/main/dockerfiles" \
114+
"$R_CTX/kubernetes/dockerfiles"
115+
cp -r "R" "$R_CTX/R"
116+
)}
117+
118+
function img_ctx_dir {
119+
if is_dev_build; then
120+
echo "$CTX_DIR/$1"
121+
else
122+
echo "$SPARK_HOME"
123+
fi
124+
}
125+
56126
function build {
57127
local BUILD_ARGS
58-
local IMG_PATH
59-
local JARS
60-
61-
if [ ! -f "$SPARK_HOME/RELEASE" ]; then
62-
# Set image build arguments accordingly if this is a source repo and not a distribution archive.
63-
#
64-
# Note that this will copy all of the example jars directory into the image, and that will
65-
# contain a lot of duplicated jars with the main Spark directory. In a proper distribution,
66-
# the examples directory is cleaned up before generating the distribution tarball, so this
67-
# issue does not occur.
68-
IMG_PATH=resource-managers/kubernetes/docker/src/main/dockerfiles
69-
JARS=assembly/target/scala-$SPARK_SCALA_VERSION/jars
70-
BUILD_ARGS=(
71-
${BUILD_PARAMS}
72-
--build-arg
73-
img_path=$IMG_PATH
74-
--build-arg
75-
spark_jars=$JARS
76-
--build-arg
77-
example_jars=examples/target/scala-$SPARK_SCALA_VERSION/jars
78-
--build-arg
79-
k8s_tests=resource-managers/kubernetes/integration-tests/tests
80-
)
81-
else
82-
# Not passed as arguments to docker, but used to validate the Spark directory.
83-
IMG_PATH="kubernetes/dockerfiles"
84-
JARS=jars
85-
BUILD_ARGS=(${BUILD_PARAMS})
128+
local SPARK_ROOT="$SPARK_HOME"
129+
130+
if is_dev_build; then
131+
create_dev_build_context || error "Failed to create docker build context."
132+
SPARK_ROOT="$CTX_DIR/base"
86133
fi
87134

88135
# Verify that the Docker image content directory is present
89-
if [ ! -d "$IMG_PATH" ]; then
136+
if [ ! -d "$SPARK_ROOT/kubernetes/dockerfiles" ]; then
90137
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
91138
fi
92139

93140
# Verify that Spark has actually been built/is a runnable distribution
94141
# i.e. the Spark JARs that the Docker files will place into the image are present
95-
local TOTAL_JARS=$(ls $JARS/spark-* | wc -l)
142+
local TOTAL_JARS=$(ls $SPARK_ROOT/jars/spark-* | wc -l)
96143
TOTAL_JARS=$(( $TOTAL_JARS ))
97144
if [ "${TOTAL_JARS}" -eq 0 ]; then
98145
error "Cannot find Spark JARs. This script assumes that Apache Spark has first been built locally or this is a runnable distribution."
99146
fi
100147

148+
local BUILD_ARGS=(${BUILD_PARAMS})
101149
local BINDING_BUILD_ARGS=(
102150
${BUILD_PARAMS}
103151
--build-arg
104152
base_img=$(image_ref spark)
105153
)
106-
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
154+
local BASEDOCKERFILE=${BASEDOCKERFILE:-"kubernetes/dockerfiles/spark/Dockerfile"}
107155
local PYDOCKERFILE=${PYDOCKERFILE:-false}
108156
local RDOCKERFILE=${RDOCKERFILE:-false}
109157

110-
docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
158+
(cd $(img_ctx_dir base) && docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
111159
-t $(image_ref spark) \
112-
-f "$BASEDOCKERFILE" .
160+
-f "$BASEDOCKERFILE" .)
113161
if [ $? -ne 0 ]; then
114162
error "Failed to build Spark JVM Docker image, please refer to Docker build output for details."
115163
fi
116164

117165
if [ "${PYDOCKERFILE}" != "false" ]; then
118-
docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
166+
(cd $(img_ctx_dir pyspark) && docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
119167
-t $(image_ref spark-py) \
120-
-f "$PYDOCKERFILE" .
168+
-f "$PYDOCKERFILE" .)
121169
if [ $? -ne 0 ]; then
122170
error "Failed to build PySpark Docker image, please refer to Docker build output for details."
123171
fi
124172
fi
125173

126174
if [ "${RDOCKERFILE}" != "false" ]; then
127-
docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
175+
(cd $(img_ctx_dir sparkr) && docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
128176
-t $(image_ref spark-r) \
129-
-f "$RDOCKERFILE" .
177+
-f "$RDOCKERFILE" .)
130178
if [ $? -ne 0 ]; then
131179
error "Failed to build SparkR Docker image, please refer to Docker build output for details."
132180
fi

common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,14 @@ public void writeMinusZeroIsReplacedWithZero() {
165165
byte[] floatBytes = new byte[Float.BYTES];
166166
Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d);
167167
Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f);
168-
double doubleFromPlatform = Platform.getDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET);
169-
float floatFromPlatform = Platform.getFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET);
170168

171-
Assert.assertEquals(Double.doubleToLongBits(0.0d), Double.doubleToLongBits(doubleFromPlatform));
172-
Assert.assertEquals(Float.floatToIntBits(0.0f), Float.floatToIntBits(floatFromPlatform));
169+
byte[] doubleBytes2 = new byte[Double.BYTES];
170+
byte[] floatBytes2 = new byte[Float.BYTES];
171+
Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, 0.0d);
172+
Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, 0.0f);
173+
174+
// Make sure the bytes we write from 0.0 and -0.0 are same.
175+
Assert.assertArrayEquals(doubleBytes, doubleBytes2);
176+
Assert.assertArrayEquals(floatBytes, floatBytes2);
173177
}
174178
}

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@
3737
import org.apache.spark.Partitioner;
3838
import org.apache.spark.ShuffleDependency;
3939
import org.apache.spark.SparkConf;
40-
import org.apache.spark.TaskContext;
41-
import org.apache.spark.executor.ShuffleWriteMetrics;
4240
import org.apache.spark.scheduler.MapStatus;
4341
import org.apache.spark.scheduler.MapStatus$;
4442
import org.apache.spark.serializer.Serializer;
4543
import org.apache.spark.serializer.SerializerInstance;
44+
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4645
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
4746
import org.apache.spark.shuffle.ShuffleWriter;
4847
import org.apache.spark.storage.*;
@@ -79,7 +78,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7978
private final int numPartitions;
8079
private final BlockManager blockManager;
8180
private final Partitioner partitioner;
82-
private final ShuffleWriteMetrics writeMetrics;
81+
private final ShuffleWriteMetricsReporter writeMetrics;
8382
private final int shuffleId;
8483
private final int mapId;
8584
private final Serializer serializer;
@@ -103,8 +102,8 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
103102
IndexShuffleBlockResolver shuffleBlockResolver,
104103
BypassMergeSortShuffleHandle<K, V> handle,
105104
int mapId,
106-
TaskContext taskContext,
107-
SparkConf conf) {
105+
SparkConf conf,
106+
ShuffleWriteMetricsReporter writeMetrics) {
108107
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
109108
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
110109
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
@@ -114,7 +113,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
114113
this.shuffleId = dep.shuffleId();
115114
this.partitioner = dep.partitioner();
116115
this.numPartitions = partitioner.numPartitions();
117-
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
116+
this.writeMetrics = writeMetrics;
118117
this.serializer = dep.serializer();
119118
this.shuffleBlockResolver = shuffleBlockResolver;
120119
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.spark.memory.TooLargePageException;
3939
import org.apache.spark.serializer.DummySerializerInstance;
4040
import org.apache.spark.serializer.SerializerInstance;
41+
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4142
import org.apache.spark.storage.BlockManager;
4243
import org.apache.spark.storage.DiskBlockObjectWriter;
4344
import org.apache.spark.storage.FileSegment;
@@ -75,7 +76,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
7576
private final TaskMemoryManager taskMemoryManager;
7677
private final BlockManager blockManager;
7778
private final TaskContext taskContext;
78-
private final ShuffleWriteMetrics writeMetrics;
79+
private final ShuffleWriteMetricsReporter writeMetrics;
7980

8081
/**
8182
* Force this sorter to spill when there are this many elements in memory.
@@ -113,7 +114,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
113114
int initialSize,
114115
int numPartitions,
115116
SparkConf conf,
116-
ShuffleWriteMetrics writeMetrics) {
117+
ShuffleWriteMetricsReporter writeMetrics) {
117118
super(memoryManager,
118119
(int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
119120
memoryManager.getTungstenMemoryMode());
@@ -144,7 +145,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
144145
*/
145146
private void writeSortedFile(boolean isLastFile) {
146147

147-
final ShuffleWriteMetrics writeMetricsToUse;
148+
final ShuffleWriteMetricsReporter writeMetricsToUse;
148149

149150
if (isLastFile) {
150151
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
@@ -241,9 +242,14 @@ private void writeSortedFile(boolean isLastFile) {
241242
//
242243
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
243244
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
244-
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
245-
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
246-
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
245+
// SPARK-3577 tracks the spill time separately.
246+
247+
// This is guaranteed to be a ShuffleWriteMetrics based on the if check in the beginning
248+
// of this method.
249+
writeMetrics.incRecordsWritten(
250+
((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
251+
taskContext.taskMetrics().incDiskBytesSpilled(
252+
((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
247253
}
248254
}
249255

0 commit comments

Comments
 (0)