Skip to content

Commit 4564bc5

Browse files
committed
Merge branch 'master' into linkMasterPage
# Conflicts: # project/MimaExcludes.scala
2 parents 0192b37 + e679bc3 commit 4564bc5

File tree

215 files changed

+3561
-4771
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

215 files changed

+3561
-4771
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public TransportConf(String module, ConfigProvider conf) {
6060
SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
6161
}
6262

63+
public int getInt(String name, int defaultValue) {
64+
return conf.getInt(name, defaultValue);
65+
}
66+
6367
private String getConfKey(String suffix) {
6468
return "spark." + module + "." + suffix;
6569
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.nio.charset.StandardCharsets;
2222
import java.util.*;
2323
import java.util.concurrent.ConcurrentMap;
24+
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.Executor;
2526
import java.util.concurrent.Executors;
2627

@@ -29,6 +30,9 @@
2930
import com.fasterxml.jackson.databind.ObjectMapper;
3031
import com.google.common.annotations.VisibleForTesting;
3132
import com.google.common.base.Objects;
33+
import com.google.common.cache.CacheBuilder;
34+
import com.google.common.cache.CacheLoader;
35+
import com.google.common.cache.LoadingCache;
3236
import com.google.common.collect.Maps;
3337
import org.fusesource.leveldbjni.JniDBFactory;
3438
import org.fusesource.leveldbjni.internal.NativeDB;
@@ -66,6 +70,12 @@ public class ExternalShuffleBlockResolver {
6670
@VisibleForTesting
6771
final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
6872

73+
/**
74+
* Caches index file information so that we can avoid open/close the index files
75+
* for each block fetch.
76+
*/
77+
private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
78+
6979
// Single-threaded Java executor used to perform expensive recursive directory deletion.
7080
private final Executor directoryCleaner;
7181

@@ -95,6 +105,15 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
95105
Executor directoryCleaner) throws IOException {
96106
this.conf = conf;
97107
this.registeredExecutorFile = registeredExecutorFile;
108+
int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
109+
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
110+
new CacheLoader<File, ShuffleIndexInformation>() {
111+
public ShuffleIndexInformation load(File file) throws IOException {
112+
return new ShuffleIndexInformation(file);
113+
}
114+
};
115+
shuffleIndexCache = CacheBuilder.newBuilder()
116+
.maximumSize(indexCacheEntries).build(indexCacheLoader);
98117
if (registeredExecutorFile != null) {
99118
Options options = new Options();
100119
options.createIfMissing(false);
@@ -265,24 +284,17 @@ private ManagedBuffer getSortBasedShuffleBlockData(
265284
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
266285
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
267286

268-
DataInputStream in = null;
269287
try {
270-
in = new DataInputStream(new FileInputStream(indexFile));
271-
in.skipBytes(reduceId * 8);
272-
long offset = in.readLong();
273-
long nextOffset = in.readLong();
288+
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
289+
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
274290
return new FileSegmentManagedBuffer(
275291
conf,
276292
getFile(executor.localDirs, executor.subDirsPerLocalDir,
277293
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
278-
offset,
279-
nextOffset - offset);
280-
} catch (IOException e) {
294+
shuffleIndexRecord.getOffset(),
295+
shuffleIndexRecord.getLength());
296+
} catch (ExecutionException e) {
281297
throw new RuntimeException("Failed to open file: " + indexFile, e);
282-
} finally {
283-
if (in != null) {
284-
JavaUtils.closeQuietly(in);
285-
}
286298
}
287299
}
288300

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle;
19+
20+
import java.io.DataInputStream;
21+
import java.io.File;
22+
import java.io.FileInputStream;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.nio.LongBuffer;
26+
27+
/**
28+
* Keeps the index information for a particular map output
29+
* as an in-memory LongBuffer.
30+
*/
31+
public class ShuffleIndexInformation {
32+
/** offsets as long buffer */
33+
private final LongBuffer offsets;
34+
35+
public ShuffleIndexInformation(File indexFile) throws IOException {
36+
int size = (int)indexFile.length();
37+
ByteBuffer buffer = ByteBuffer.allocate(size);
38+
offsets = buffer.asLongBuffer();
39+
DataInputStream dis = null;
40+
try {
41+
dis = new DataInputStream(new FileInputStream(indexFile));
42+
dis.readFully(buffer.array());
43+
} finally {
44+
if (dis != null) {
45+
dis.close();
46+
}
47+
}
48+
}
49+
50+
/**
51+
* Get index offset for a particular reducer.
52+
*/
53+
public ShuffleIndexRecord getIndex(int reduceId) {
54+
long offset = offsets.get(reduceId);
55+
long nextOffset = offsets.get(reduceId + 1);
56+
return new ShuffleIndexRecord(offset, nextOffset - offset);
57+
}
58+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle;
19+
20+
/**
21+
* Contains offset and length of the shuffle block data.
22+
*/
23+
public class ShuffleIndexRecord {
24+
private final long offset;
25+
private final long length;
26+
27+
public ShuffleIndexRecord(long offset, long length) {
28+
this.offset = offset;
29+
this.length = length;
30+
}
31+
32+
public long getOffset() {
33+
return offset;
34+
}
35+
36+
public long getLength() {
37+
return length;
38+
}
39+
}
40+

conf/metrics.properties.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
# period 10 Poll period
9494
# unit seconds Unit of the poll period
9595
# ttl 1 TTL of messages sent by Ganglia
96+
# dmax 0 Lifetime in seconds of metrics (0 never expired)
9697
# mode multicast Ganglia network mode ('unicast' or 'multicast')
9798

9899
# org.apache.spark.metrics.sink.JmxSink

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.lang.reflect.Constructor
2222
import java.net.URI
2323
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
24-
import java.util.concurrent.ConcurrentMap
24+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
2626

2727
import scala.collection.JavaConverters._
@@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
262262
private[spark] def env: SparkEnv = _env
263263

264264
// Used to store a URL for each static file/jar together with the file's local timestamp
265-
private[spark] val addedFiles = HashMap[String, Long]()
266-
private[spark] val addedJars = HashMap[String, Long]()
265+
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
266+
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
267267

268268
// Keeps track of all persisted RDDs
269269
private[spark] val persistentRdds = {
@@ -1430,14 +1430,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14301430
schemeCorrectedPath
14311431
}
14321432
val timestamp = System.currentTimeMillis
1433-
addedFiles(key) = timestamp
1434-
1435-
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
1436-
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
1437-
hadoopConfiguration, timestamp, useCache = false)
1438-
1439-
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
1440-
postEnvironmentUpdate()
1433+
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
1434+
logInfo(s"Added file $path at $key with timestamp $timestamp")
1435+
// Fetch the file locally so that closures which are run on the driver can still use the
1436+
// SparkFiles API to access files.
1437+
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
1438+
hadoopConfiguration, timestamp, useCache = false)
1439+
postEnvironmentUpdate()
1440+
}
14411441
}
14421442

14431443
/**
@@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17051705
case exc: FileNotFoundException =>
17061706
logError(s"Jar not found at $path")
17071707
null
1708-
case e: Exception =>
1709-
// For now just log an error but allow to go through so spark examples work.
1710-
// The spark examples don't really need the jar distributed since its also
1711-
// the app jar.
1712-
logError("Error adding jar (" + e + "), was the --addJars option used?")
1713-
null
17141708
}
17151709
}
17161710
// A JAR file which exists locally on every worker node
@@ -1721,11 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17211715
}
17221716
}
17231717
if (key != null) {
1724-
addedJars(key) = System.currentTimeMillis
1725-
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
1718+
val timestamp = System.currentTimeMillis
1719+
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
1720+
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
1721+
postEnvironmentUpdate()
1722+
}
17261723
}
17271724
}
1728-
postEnvironmentUpdate()
17291725
}
17301726

17311727
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -474,12 +474,17 @@ abstract class RDD[T: ClassTag](
474474
def sample(
475475
withReplacement: Boolean,
476476
fraction: Double,
477-
seed: Long = Utils.random.nextLong): RDD[T] = withScope {
478-
require(fraction >= 0.0, "Negative fraction value: " + fraction)
479-
if (withReplacement) {
480-
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
481-
} else {
482-
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
477+
seed: Long = Utils.random.nextLong): RDD[T] = {
478+
require(fraction >= 0,
479+
s"Fraction must be nonnegative, but got ${fraction}")
480+
481+
withScope {
482+
require(fraction >= 0.0, "Negative fraction value: " + fraction)
483+
if (withReplacement) {
484+
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
485+
} else {
486+
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
487+
}
483488
}
484489
}
485490

@@ -493,14 +498,22 @@ abstract class RDD[T: ClassTag](
493498
*/
494499
def randomSplit(
495500
weights: Array[Double],
496-
seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {
497-
val sum = weights.sum
498-
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
499-
normalizedCumWeights.sliding(2).map { x =>
500-
randomSampleWithRange(x(0), x(1), seed)
501-
}.toArray
501+
seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
502+
require(weights.forall(_ >= 0),
503+
s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
504+
require(weights.sum > 0,
505+
s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")
506+
507+
withScope {
508+
val sum = weights.sum
509+
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
510+
normalizedCumWeights.sliding(2).map { x =>
511+
randomSampleWithRange(x(0), x(1), seed)
512+
}.toArray
513+
}
502514
}
503515

516+
504517
/**
505518
* Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability
506519
* range.

core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rpc.netty
1919

20-
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
20+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
2121
import javax.annotation.concurrent.GuardedBy
2222

2323
import scala.collection.JavaConverters._
@@ -42,8 +42,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
4242
val inbox = new Inbox(ref, endpoint)
4343
}
4444

45-
private val endpoints = new ConcurrentHashMap[String, EndpointData]
46-
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
45+
private val endpoints: ConcurrentMap[String, EndpointData] =
46+
new ConcurrentHashMap[String, EndpointData]
47+
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
48+
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
4749

4850
// Track the receivers whose inboxes may contain messages.
4951
private val receivers = new LinkedBlockingQueue[EndpointData]

core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,18 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
6666
}
6767

6868
override def addFile(file: File): String = {
69-
require(files.putIfAbsent(file.getName(), file) == null,
70-
s"File ${file.getName()} already registered.")
69+
val existingPath = files.putIfAbsent(file.getName, file)
70+
require(existingPath == null || existingPath == file,
71+
s"File ${file.getName} was already registered with a different path " +
72+
s"(old path = $existingPath, new path = $file")
7173
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
7274
}
7375

7476
override def addJar(file: File): String = {
75-
require(jars.putIfAbsent(file.getName(), file) == null,
76-
s"JAR ${file.getName()} already registered.")
77+
val existingPath = jars.putIfAbsent(file.getName, file)
78+
require(existingPath == null || existingPath == file,
79+
s"File ${file.getName} was already registered with a different path " +
80+
s"(old path = $existingPath, new path = $file")
7781
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
7882
}
7983

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
2121
import java.nio.ByteBuffer
2222
import java.util.Properties
2323

24+
import scala.collection.mutable
2425
import scala.collection.mutable.HashMap
2526

2627
import org.apache.spark._
@@ -198,8 +199,8 @@ private[spark] object Task {
198199
*/
199200
def serializeWithDependencies(
200201
task: Task[_],
201-
currentFiles: HashMap[String, Long],
202-
currentJars: HashMap[String, Long],
202+
currentFiles: mutable.Map[String, Long],
203+
currentJars: mutable.Map[String, Long],
203204
serializer: SerializerInstance)
204205
: ByteBuffer = {
205206

0 commit comments

Comments
 (0)