Skip to content

Commit 045a9be

Browse files
committed
Merge remote-tracking branch 'origin/master' into typed_udaf
2 parents 7a136c5 + fbf8d00 commit 045a9be

File tree

155 files changed

+3099
-1605
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

155 files changed

+3099
-1605
lines changed

LICENSE

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
257257
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org)
258258
(BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org)
259259
(BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org)
260-
(New BSD License) Kryo (com.esotericsoftware.kryo:kryo:2.21 - http://code.google.com/p/kryo/)
261-
(New BSD License) MinLog (com.esotericsoftware.minlog:minlog:1.2 - http://code.google.com/p/minlog/)
262-
(New BSD License) ReflectASM (com.esotericsoftware.reflectasm:reflectasm:1.07 - http://code.google.com/p/reflectasm/)
260+
(New BSD License) Kryo (com.esotericsoftware:kryo:3.0.3 - https://github.com/EsotericSoftware/kryo)
261+
(New BSD License) MinLog (com.esotericsoftware:minlog:1.3.0 - https://github.com/EsotericSoftware/minlog)
263262
(New BSD license) Protocol Buffer Java API (com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
264263
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
265264
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)

build/mvn

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ install_app() {
7070
# Install maven under the build/ folder
7171
install_mvn() {
7272
local MVN_VERSION="3.3.9"
73+
local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='}
7374

7475
install_app \
75-
"https://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
76+
"${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries" \
7677
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
7778
"apache-maven-${MVN_VERSION}/bin/mvn"
7879

@@ -83,8 +84,10 @@ install_mvn() {
8384
install_zinc() {
8485
local zinc_path="zinc-0.3.9/bin/zinc"
8586
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
87+
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}
88+
8689
install_app \
87-
"https://downloads.typesafe.com/zinc/0.3.9" \
90+
"${TYPESAFE_MIRROR}/zinc/0.3.9" \
8891
"zinc-0.3.9.tgz" \
8992
"${zinc_path}"
9093
ZINC_BIN="${_DIR}/${zinc_path}"
@@ -98,9 +101,10 @@ install_scala() {
98101
local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | \
99102
head -1 | cut -f2 -d'>' | cut -f1 -d'<'`
100103
local scala_bin="${_DIR}/scala-${scala_version}/bin/scala"
104+
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}
101105

102106
install_app \
103-
"https://downloads.typesafe.com/scala/${scala_version}" \
107+
"${TYPESAFE_MIRROR}/scala/${scala_version}" \
104108
"scala-${scala_version}.tgz" \
105109
"scala-${scala_version}/bin/scala"
106110

core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java renamed to core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,17 @@
2020
import java.io.InputStream;
2121
import java.util.zip.Checksum;
2222

23-
import net.jpountz.lz4.LZ4BlockOutputStream;
2423
import net.jpountz.lz4.LZ4Exception;
2524
import net.jpountz.lz4.LZ4Factory;
2625
import net.jpountz.lz4.LZ4FastDecompressor;
2726
import net.jpountz.util.SafeUtils;
28-
import net.jpountz.xxhash.StreamingXXHash32;
29-
import net.jpountz.xxhash.XXHash32;
3027
import net.jpountz.xxhash.XXHashFactory;
3128

3229
/**
3330
* {@link InputStream} implementation to decode data written with
34-
* {@link LZ4BlockOutputStream}. This class is not thread-safe and does not
31+
* {@link net.jpountz.lz4.LZ4BlockOutputStream}. This class is not thread-safe and does not
3532
* support {@link #mark(int)}/{@link #reset()}.
36-
* @see LZ4BlockOutputStream
33+
* @see net.jpountz.lz4.LZ4BlockOutputStream
3734
*
3835
* This is based on net.jpountz.lz4.LZ4BlockInputStream
3936
*
@@ -90,12 +87,13 @@ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Che
9087
}
9188

9289
/**
93-
* Create a new instance using {@link XXHash32} for checksuming.
90+
* Create a new instance using {@link net.jpountz.xxhash.XXHash32} for checksuming.
9491
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum)
95-
* @see StreamingXXHash32#asChecksum()
92+
* @see net.jpountz.xxhash.StreamingXXHash32#asChecksum()
9693
*/
9794
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
98-
this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
95+
this(in, decompressor,
96+
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
9997
}
10098

10199
/**

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -716,25 +716,21 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
716716
offset += klen;
717717
Platform.copyMemory(vbase, voff, base, offset, vlen);
718718
offset += vlen;
719-
Platform.putLong(base, offset, 0);
719+
// put this value at the beginning of the list
720+
Platform.putLong(base, offset, isDefined ? longArray.get(pos * 2) : 0);
720721

721722
// --- Update bookkeeping data structures ----------------------------------------------------
722723
offset = currentPage.getBaseOffset();
723724
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
724725
pageCursor += recordLength;
725726
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
726727
currentPage, recordOffset);
728+
longArray.set(pos * 2, storedKeyAddress);
729+
updateAddressesAndSizes(storedKeyAddress);
727730
numValues++;
728-
if (isDefined) {
729-
// put this pair at the end of chain
730-
while (nextValue()) { /* do nothing */ }
731-
Platform.putLong(baseObject, valueOffset + valueLength, storedKeyAddress);
732-
nextValue(); // point to new added value
733-
} else {
731+
if (!isDefined) {
734732
numKeys++;
735-
longArray.set(pos * 2, storedKeyAddress);
736733
longArray.set(pos * 2 + 1, keyHashcode);
737-
updateAddressesAndSizes(storedKeyAddress);
738734
isDefined = true;
739735

740736
if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13561356
* Register a listener to receive up-calls from events that happen during execution.
13571357
*/
13581358
@DeveloperApi
1359-
def addSparkListener(listener: SparkListener) {
1359+
def addSparkListener(listener: SparkListenerInterface) {
13601360
listenerBus.addListener(listener)
13611361
}
13621362

@@ -2007,15 +2007,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
20072007
// Use reflection to find the right constructor
20082008
val constructors = {
20092009
val listenerClass = Utils.classForName(className)
2010-
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
2010+
listenerClass
2011+
.getConstructors
2012+
.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
20112013
}
20122014
val constructorTakingSparkConf = constructors.find { c =>
20132015
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
20142016
}
20152017
lazy val zeroArgumentConstructor = constructors.find { c =>
20162018
c.getParameterTypes.isEmpty
20172019
}
2018-
val listener: SparkListener = {
2020+
val listener: SparkListenerInterface = {
20192021
if (constructorTakingSparkConf.isDefined) {
20202022
constructorTakingSparkConf.get.newInstance(conf)
20212023
} else if (zeroArgumentConstructor.isDefined) {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ object SparkEnv extends Logging {
314314
UnifiedMemoryManager(conf, numUsableCores)
315315
}
316316

317-
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
317+
val blockTransferService =
318+
new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
318319

319320
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
320321
BlockManagerMaster.DRIVER_ENDPOINT_NAME,

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ private[spark] class Executor(
321321
logInfo(s"Executor killed $taskName (TID $taskId)")
322322
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
323323

324-
case cDE: CommitDeniedException =>
324+
case CausedBy(cDE: CommitDeniedException) =>
325325
val reason = cDE.toTaskEndReason
326326
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
327327

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging {
3333
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
3434
* details).
3535
*
36-
* Output commit coordinator is only contacted when the following two configurations are both set
37-
* to `true`:
38-
*
39-
* - `spark.speculation`
40-
* - `spark.hadoop.outputCommitCoordination.enabled`
36+
* Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
37+
* is set to true (which is the default).
4138
*/
4239
def commitTask(
4340
committer: MapReduceOutputCommitter,
@@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging {
6461
if (committer.needsTaskCommit(mrTaskContext)) {
6562
val shouldCoordinateWithDriver: Boolean = {
6663
val sparkConf = SparkEnv.get.conf
67-
// We only need to coordinate with the driver if there are multiple concurrent task
68-
// attempts, which should only occur if speculation is enabled
69-
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
70-
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
71-
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
64+
// We only need to coordinate with the driver if there are concurrent task attempts.
65+
// Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
66+
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
67+
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
7268
}
7369

7470
if (shouldCoordinateWithDriver) {

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ import org.apache.spark.util.Utils
3939
/**
4040
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
4141
*/
42-
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
42+
private[spark] class NettyBlockTransferService(
43+
conf: SparkConf,
44+
securityManager: SecurityManager,
45+
override val hostName: String,
46+
numCores: Int)
4347
extends BlockTransferService {
4448

4549
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
@@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
6569
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
6670
server = createServer(serverBootstrap.toList)
6771
appId = conf.getAppId
68-
logInfo("Server created on " + server.getPort)
72+
logInfo(s"Server created on ${hostName}:${server.getPort}")
6973
}
7074

7175
/** Creates and binds the TransportServer, possibly trying multiple ports. */
7276
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
7377
def startService(port: Int): (TransportServer, Int) = {
74-
val server = transportContext.createServer(port, bootstraps.asJava)
78+
val server = transportContext.createServer(hostName, port, bootstraps.asJava)
7579
(server, server.getPort)
7680
}
7781

@@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
109113
}
110114
}
111115

112-
override def hostName: String = Utils.localHostName()
113-
114116
override def port: Int = server.getPort
115117

116118
override def uploadBlock(

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,9 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11111111
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
11121112
recordsWritten += 1
11131113
}
1114-
} {
1115-
writer.close(hadoopContext)
1116-
}
1114+
}(finallyBlock = writer.close(hadoopContext))
11171115
committer.commitTask(hadoopContext)
11181116
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
11191117
om.setBytesWritten(callback())
@@ -1200,9 +1198,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
12001198
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
12011199
recordsWritten += 1
12021200
}
1203-
} {
1204-
writer.close()
1205-
}
1201+
}(finallyBlock = writer.close())
12061202
writer.commit()
12071203
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
12081204
om.setBytesWritten(callback())

0 commit comments

Comments
 (0)