Skip to content

Commit 3d70c1d

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-15205
2 parents 766f5c6 + e157647 commit 3d70c1d

File tree

434 files changed

+8088
-3915
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

434 files changed

+8088
-3915
lines changed

.travis.yml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Spark provides this Travis CI configuration file to help contributors
17+
# check Scala/Java style conformance and JDK7/8 compilation easily
18+
# during their preparing pull requests.
19+
# - Scalastyle is executed during `maven install` implicitly.
20+
# - Java Checkstyle is executed by `lint-java`.
21+
# See the related discussion here.
22+
# https://github.com/apache/spark/pull/12980
23+
24+
# 1. Choose OS (Ubuntu 14.04.3 LTS Server Edition 64bit, ~2 CORE, 7.5GB RAM)
25+
sudo: required
26+
dist: trusty
27+
28+
# 2. Choose language and target JDKs for parallel builds.
29+
language: java
30+
jdk:
31+
- oraclejdk7
32+
- oraclejdk8
33+
34+
# 3. Setup cache directory for SBT and Maven.
35+
cache:
36+
directories:
37+
- $HOME/.sbt
38+
- $HOME/.m2
39+
40+
# 4. Turn off notifications.
41+
notifications:
42+
email: false
43+
44+
# 5. Run maven install before running lint-java.
45+
install:
46+
- export MAVEN_SKIP_RC=1
47+
- build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
48+
49+
# 6. Run lint-java.
50+
script:
51+
- dev/lint-java

R/pkg/R/sparkR.R

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ sparkR.stop <- function() {
103103
#' list(spark.executor.memory="4g"),
104104
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
105105
#' c("one.jar", "two.jar", "three.jar"),
106-
#' c("com.databricks:spark-avro_2.10:2.0.1",
107-
#' "com.databricks:spark-csv_2.10:1.3.0"))
106+
#' c("com.databricks:spark-avro_2.10:2.0.1"))
108107
#'}
109108

110109
sparkR.init <- function(

R/pkg/R/utils.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,11 @@ wrapInt <- function(value) {
157157

158158
# Multiply `val` by 31 and add `addVal` to the result. Ensures that
159159
# integer-overflows are handled at every step.
160+
#
161+
# TODO: this function does not handle integer overflow well
160162
mult31AndAdd <- function(val, addVal) {
161163
vec <- c(bitwShiftL(val, c(4, 3, 2, 1, 0)), addVal)
164+
vec[is.na(vec)] <- 0
162165
Reduce(function(a, b) {
163166
wrapInt(as.numeric(a) + as.numeric(b))
164167
},

R/pkg/inst/tests/testthat/test_client.R

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ test_that("multiple packages don't produce a warning", {
3737

3838
test_that("sparkJars sparkPackages as character vectors", {
3939
args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
40-
c("com.databricks:spark-avro_2.10:2.0.1",
41-
"com.databricks:spark-csv_2.10:1.3.0"))
40+
c("com.databricks:spark-avro_2.10:2.0.1"))
4241
expect_match(args, "--jars one.jar,two.jar,three.jar")
43-
expect_match(args,
44-
"--packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.3.0")
42+
expect_match(args, "--packages com.databricks:spark-avro_2.10:2.0.1")
4543
})

R/pkg/inst/tests/testthat/test_utils.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,7 @@ test_that("convertToJSaveMode", {
164164
expect_error(convertToJSaveMode("foo"),
165165
'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
166166
})
167+
168+
test_that("hashCode", {
169+
expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA)
170+
})

bin/spark-class

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,25 @@ fi
6464
# The launcher library will print arguments separated by a NULL character, to allow arguments with
6565
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
6666
# an array that will be used to exec the final command.
67+
#
68+
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
69+
# command array and checks the value to see if the launcher succeeded.
70+
build_command() {
71+
"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
72+
printf "%d\0" $?
73+
}
74+
6775
CMD=()
6876
while IFS= read -d '' -r ARG; do
6977
CMD+=("$ARG")
70-
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
78+
done < <(build_command "$@")
79+
80+
COUNT=${#CMD[@]}
81+
LAST=$((COUNT - 1))
82+
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
83+
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
84+
exit $LAUNCHER_EXIT_CODE
85+
fi
86+
87+
CMD=("${CMD[@]:0:$LAST}")
7188
exec "${CMD[@]}"

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,21 @@ public class YarnShuffleService extends AuxiliaryService {
6868
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
6969
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
7070

71+
private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
72+
7173
// An entity that manages the shuffle secret per application
7274
// This is used only if authentication is enabled
7375
private ShuffleSecretManager secretManager;
7476

7577
// The actual server that serves shuffle files
7678
private TransportServer shuffleServer = null;
7779

80+
private Configuration _conf = null;
81+
82+
// The recovery path used to shuffle service recovery
83+
@VisibleForTesting
84+
Path _recoveryPath = null;
85+
7886
// Handles registering executors and opening shuffle blocks
7987
@VisibleForTesting
8088
ExternalShuffleBlockHandler blockHandler;
@@ -112,14 +120,15 @@ private boolean isAuthenticationEnabled() {
112120
*/
113121
@Override
114122
protected void serviceInit(Configuration conf) {
123+
_conf = conf;
115124

116125
// In case this NM was killed while there were running spark applications, we need to restore
117126
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
118127
// If we don't find one, then we choose a file to use to save the state next time. Even if
119128
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
120129
// when it comes back
121130
registeredExecutorFile =
122-
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
131+
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
123132

124133
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
125134
// If authentication is enabled, set up the shuffle server to use a
@@ -190,16 +199,6 @@ public void stopContainer(ContainerTerminationContext context) {
190199
logger.info("Stopping container {}", containerId);
191200
}
192201

193-
private File findRegisteredExecutorFile(String[] localDirs) {
194-
for (String dir: localDirs) {
195-
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
196-
if (f.exists()) {
197-
return f;
198-
}
199-
}
200-
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
201-
}
202-
203202
/**
204203
* Close the shuffle server to clean up any associated state.
205204
*/
@@ -222,4 +221,47 @@ protected void serviceStop() {
222221
public ByteBuffer getMetaData() {
223222
return ByteBuffer.allocate(0);
224223
}
224+
225+
/**
226+
* Set the recovery path for shuffle service recovery when NM is restarted. The method will be
227+
* overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
228+
* have to manually call this to set our own recovery path.
229+
*/
230+
public void setRecoveryPath(Path recoveryPath) {
231+
_recoveryPath = recoveryPath;
232+
}
233+
234+
/**
235+
* Get the recovery path, this will override the default one to get our own maintained
236+
* recovery path.
237+
*/
238+
protected Path getRecoveryPath() {
239+
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
240+
for (String dir : localDirs) {
241+
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
242+
if (f.exists()) {
243+
if (_recoveryPath == null) {
244+
// If NM recovery is not enabled, we should specify the recovery path using NM local
245+
// dirs, which is compatible with the old code.
246+
_recoveryPath = new Path(dir);
247+
} else {
248+
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which
249+
// means old version of Spark already generated the recovery file, we should copy the
250+
// old file in to a new recovery path for the compatibility.
251+
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
252+
// Fail to move recovery file to new path
253+
logger.error("Failed to move recovery file {} to the path {}",
254+
RECOVERY_FILE_NAME, _recoveryPath.toString());
255+
}
256+
}
257+
break;
258+
}
259+
}
260+
261+
if (_recoveryPath == null) {
262+
_recoveryPath = new Path(localDirs[0]);
263+
}
264+
265+
return _recoveryPath;
266+
}
225267
}

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ public long getMemoryConsumptionForThisTask() {
413413
/**
414414
* Returns Tungsten memory mode
415415
*/
416-
public MemoryMode getTungstenMemoryMode(){
416+
public MemoryMode getTungstenMemoryMode() {
417417
return tungstenMemoryMode;
418418
}
419419
}

core/src/main/resources/org/apache/spark/ui/static/timeline-view.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
4141
$(".item.range.job.application-timeline-object").each(function() {
4242
var getSelectorForJobEntry = function(baseElem) {
4343
var jobIdText = $($(baseElem).find(".application-timeline-content")[0]).text();
44-
var jobId = jobIdText.match("\\(Job (\\d+)\\)")[1];
44+
var jobId = jobIdText.match("\\(Job (\\d+)\\)$")[1];
4545
return "#job-" + jobId;
4646
};
4747

@@ -113,7 +113,7 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
113113
$(".item.range.stage.job-timeline-object").each(function() {
114114
var getSelectorForStageEntry = function(baseElem) {
115115
var stageIdText = $($(baseElem).find(".job-timeline-content")[0]).text();
116-
var stageIdAndAttempt = stageIdText.match("\\(Stage (\\d+\\.\\d+)\\)")[1].split(".");
116+
var stageIdAndAttempt = stageIdText.match("\\(Stage (\\d+\\.\\d+)\\)$")[1].split(".");
117117
return "#stage-" + stageIdAndAttempt[0] + "-" + stageIdAndAttempt[1];
118118
};
119119

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.spark.broadcast.BroadcastManager
3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
3333
import org.apache.spark.metrics.MetricsSystem
34-
import org.apache.spark.network.BlockTransferService
3534
import org.apache.spark.network.netty.NettyBlockTransferService
3635
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
3736
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
@@ -61,10 +60,8 @@ class SparkEnv (
6160
val mapOutputTracker: MapOutputTracker,
6261
val shuffleManager: ShuffleManager,
6362
val broadcastManager: BroadcastManager,
64-
val blockTransferService: BlockTransferService,
6563
val blockManager: BlockManager,
6664
val securityManager: SecurityManager,
67-
val sparkFilesDir: String,
6865
val metricsSystem: MetricsSystem,
6966
val memoryManager: MemoryManager,
7067
val outputCommitCoordinator: OutputCommitCoordinator,
@@ -77,7 +74,7 @@ class SparkEnv (
7774
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
7875
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
7976

80-
private var driverTmpDirToDelete: Option[String] = None
77+
private[spark] var driverTmpDir: Option[String] = None
8178

8279
private[spark] def stop() {
8380

@@ -94,13 +91,10 @@ class SparkEnv (
9491
rpcEnv.shutdown()
9592
rpcEnv.awaitTermination()
9693

97-
// Note that blockTransferService is stopped by BlockManager since it is started by it.
98-
9994
// If we only stop sc, but the driver process still run as a services then we need to delete
10095
// the tmp dir, if not, it will create too many tmp dirs.
101-
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
102-
// current working dir in executor which we do not need to delete.
103-
driverTmpDirToDelete match {
96+
// We only need to delete the tmp dir create by driver
97+
driverTmpDir match {
10498
case Some(path) =>
10599
try {
106100
Utils.deleteRecursively(new File(path))
@@ -342,15 +336,6 @@ object SparkEnv extends Logging {
342336
ms
343337
}
344338

345-
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
346-
// this is a temporary directory; in distributed mode, this is the executor's current working
347-
// directory.
348-
val sparkFilesDir: String = if (isDriver) {
349-
Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
350-
} else {
351-
"."
352-
}
353-
354339
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
355340
new OutputCommitCoordinator(conf, isDriver)
356341
}
@@ -367,10 +352,8 @@ object SparkEnv extends Logging {
367352
mapOutputTracker,
368353
shuffleManager,
369354
broadcastManager,
370-
blockTransferService,
371355
blockManager,
372356
securityManager,
373-
sparkFilesDir,
374357
metricsSystem,
375358
memoryManager,
376359
outputCommitCoordinator,
@@ -380,7 +363,8 @@ object SparkEnv extends Logging {
380363
// called, and we only need to do it for driver. Because driver may run as a service, and if we
381364
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
382365
if (isDriver) {
383-
envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
366+
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
367+
envInstance.driverTmpDir = Some(sparkFilesDir)
384368
}
385369

386370
envInstance

0 commit comments

Comments
 (0)