Skip to content

Commit 7590a25

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into semijoin
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
2 parents 2d4085b + e1e0587 commit 7590a25

File tree

233 files changed

+8935
-2552
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

233 files changed

+8935
-2552
lines changed

R/install-dev.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,4 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
4242
# Install SparkR to $LIB_DIR
4343
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
4444

45-
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
46-
cd $LIB_DIR
47-
jar cfM "$LIB_DIR/sparkr.zip" SparkR
48-
4945
popd > /dev/null
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
library(SparkR)
18+
library(sparkPackageTest)
19+
20+
sc <- sparkR.init()
21+
22+
run1 <- myfunc(5L)
23+
24+
run2 <- myfunc(-4L)
25+
26+
sparkR.stop()
27+
28+
if(run1 != 6) quit(save = "no", status = 1)
29+
30+
if(run2 != -3) quit(save = "no", status = 1)

build/mvn

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ install_app() {
5151
# check if we have curl installed
5252
# download application
5353
[ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
54-
echo "exec: curl ${curl_opts} ${remote_tarball}" && \
54+
echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \
5555
curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
5656
# if the file still doesn't exist, lets try `wget` and cross our fingers
5757
[ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
58-
echo "exec: wget ${wget_opts} ${remote_tarball}" && \
58+
echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \
5959
wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
6060
# if both were unsuccessful, exit
6161
[ ! -f "${local_tarball}" ] && \
@@ -82,7 +82,7 @@ install_mvn() {
8282
# Install zinc under the build/ folder
8383
install_zinc() {
8484
local zinc_path="zinc-0.3.5.3/bin/zinc"
85-
[ ! -f "${zinc_path}" ] && ZINC_INSTALL_FLAG=1
85+
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
8686
install_app \
8787
"http://downloads.typesafe.com/zinc/0.3.5.3" \
8888
"zinc-0.3.5.3.tgz" \
@@ -135,9 +135,9 @@ cd "${_CALLING_DIR}"
135135

136136
# Now that zinc is ensured to be installed, check its status and, if its
137137
# not running or just installed, start it
138-
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then
138+
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status -port ${ZINC_PORT}`" ]; then
139139
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
140-
${ZINC_BIN} -shutdown
140+
${ZINC_BIN} -shutdown -port ${ZINC_PORT}
141141
${ZINC_BIN} -start -port ${ZINC_PORT} \
142142
-scala-compiler "${SCALA_COMPILER}" \
143143
-scala-library "${SCALA_LIBRARY}" &>/dev/null
@@ -146,7 +146,7 @@ fi
146146
# Set any `mvn` options if not already present
147147
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
148148

149-
echo "Using \`mvn\` from path: $MVN_BIN"
149+
echo "Using \`mvn\` from path: $MVN_BIN" 1>&2
150150

151151
# Last, call the `mvn` command as usual
152-
${MVN_BIN} "$@"
152+
${MVN_BIN} -DzincPort=${ZINC_PORT} "$@"

core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
// See
2424
// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
2525
abstract class JavaSparkContextVarargsWorkaround {
26-
public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
26+
27+
@SafeVarargs
28+
public final <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
2729
if (rdds.length == 0) {
2830
throw new IllegalArgumentException("Union called on empty list");
2931
}
30-
ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
32+
List<JavaRDD<T>> rest = new ArrayList<>(rdds.length - 1);
3133
for (int i = 1; i < rdds.length; i++) {
3234
rest.add(rdds[i]);
3335
}
@@ -38,26 +40,27 @@ public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
3840
if (rdds.length == 0) {
3941
throw new IllegalArgumentException("Union called on empty list");
4042
}
41-
ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
43+
List<JavaDoubleRDD> rest = new ArrayList<>(rdds.length - 1);
4244
for (int i = 1; i < rdds.length; i++) {
4345
rest.add(rdds[i]);
4446
}
4547
return union(rdds[0], rest);
4648
}
4749

48-
public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
50+
@SafeVarargs
51+
public final <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
4952
if (rdds.length == 0) {
5053
throw new IllegalArgumentException("Union called on empty list");
5154
}
52-
ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
55+
List<JavaPairRDD<K, V>> rest = new ArrayList<>(rdds.length - 1);
5356
for (int i = 1; i < rdds.length; i++) {
5457
rest.add(rdds[i]);
5558
}
5659
return union(rdds[0], rest);
5760
}
5861

5962
// These methods take separate "first" and "rest" elements to avoid having the same type erasure
60-
abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
61-
abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
62-
abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
63+
public abstract <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
64+
public abstract JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
65+
public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
6366
}

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@
2020
import com.google.common.primitives.UnsignedLongs;
2121

2222
import org.apache.spark.annotation.Private;
23+
import org.apache.spark.unsafe.PlatformDependent;
2324
import org.apache.spark.unsafe.types.UTF8String;
2425
import org.apache.spark.util.Utils;
26+
import static org.apache.spark.unsafe.PlatformDependent.BYTE_ARRAY_OFFSET;
2527

2628
@Private
2729
public class PrefixComparators {
2830
private PrefixComparators() {}
2931

3032
public static final StringPrefixComparator STRING = new StringPrefixComparator();
3133
public static final StringPrefixComparatorDesc STRING_DESC = new StringPrefixComparatorDesc();
34+
public static final BinaryPrefixComparator BINARY = new BinaryPrefixComparator();
35+
public static final BinaryPrefixComparatorDesc BINARY_DESC = new BinaryPrefixComparatorDesc();
3236
public static final LongPrefixComparator LONG = new LongPrefixComparator();
3337
public static final LongPrefixComparatorDesc LONG_DESC = new LongPrefixComparatorDesc();
3438
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
@@ -52,6 +56,38 @@ public int compare(long bPrefix, long aPrefix) {
5256
}
5357
}
5458

59+
public static final class BinaryPrefixComparator extends PrefixComparator {
60+
@Override
61+
public int compare(long aPrefix, long bPrefix) {
62+
return UnsignedLongs.compare(aPrefix, bPrefix);
63+
}
64+
65+
public static long computePrefix(byte[] bytes) {
66+
if (bytes == null) {
67+
return 0L;
68+
} else {
69+
/**
70+
* TODO: If a wrapper for BinaryType is created (SPARK-8786),
71+
* these codes below will be in the wrapper class.
72+
*/
73+
final int minLen = Math.min(bytes.length, 8);
74+
long p = 0;
75+
for (int i = 0; i < minLen; ++i) {
76+
p |= (128L + PlatformDependent.UNSAFE.getByte(bytes, BYTE_ARRAY_OFFSET + i))
77+
<< (56 - 8 * i);
78+
}
79+
return p;
80+
}
81+
}
82+
}
83+
84+
public static final class BinaryPrefixComparatorDesc extends PrefixComparator {
85+
@Override
86+
public int compare(long bPrefix, long aPrefix) {
87+
return UnsignedLongs.compare(aPrefix, bPrefix);
88+
}
89+
}
90+
5591
public static final class LongPrefixComparator extends PrefixComparator {
5692
@Override
5793
public int compare(long a, long b) {

0 commit comments

Comments
 (0)