Skip to content

Commit 20e667c

Browse files
committed
Merge branch 'master' into yarn_ClientBase
2 parents 0961151 + ce92a9c commit 20e667c

File tree

107 files changed

+1608
-364
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+1608
-364
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
sbt/*.jar
88
.settings
99
.cache
10-
.generated-mima-excludes
10+
.generated-mima*
1111
/build/
1212
work/
1313
out/

bin/pyspark

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fi
4545
. $FWDIR/bin/load-spark-env.sh
4646

4747
# Figure out which Python executable to use
48-
if [ -z "$PYSPARK_PYTHON" ] ; then
48+
if [[ -z "$PYSPARK_PYTHON" ]]; then
4949
PYSPARK_PYTHON="python"
5050
fi
5151
export PYSPARK_PYTHON
@@ -59,7 +59,7 @@ export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
5959
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
6060

6161
# If IPython options are specified, assume user wants to run IPython
62-
if [ -n "$IPYTHON_OPTS" ]; then
62+
if [[ -n "$IPYTHON_OPTS" ]]; then
6363
IPYTHON=1
6464
fi
6565

@@ -76,6 +76,16 @@ for i in "$@"; do
7676
done
7777
export PYSPARK_SUBMIT_ARGS
7878

79+
# For pyspark tests
80+
if [[ -n "$SPARK_TESTING" ]]; then
81+
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
82+
exec "$PYSPARK_PYTHON" -m doctest $1
83+
else
84+
exec "$PYSPARK_PYTHON" $1
85+
fi
86+
exit
87+
fi
88+
7989
# If a python file is provided, directly run spark-submit.
8090
if [[ "$1" =~ \.py$ ]]; then
8191
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
@@ -86,10 +96,6 @@ else
8696
if [[ "$IPYTHON" = "1" ]]; then
8797
exec ipython $IPYTHON_OPTS
8898
else
89-
if [[ -n $SPARK_TESTING ]]; then
90-
exec "$PYSPARK_PYTHON" -m doctest
91-
else
92-
exec "$PYSPARK_PYTHON"
93-
fi
99+
exec "$PYSPARK_PYTHON"
94100
fi
95101
fi

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
9696
}
9797

9898
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
99-
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) {
99+
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
100100
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
101101
}
102102

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.serializer.Serializer
23+
import org.apache.spark.shuffle.ShuffleHandle
2324

2425
/**
2526
* :: DeveloperApi ::
@@ -50,19 +51,24 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
5051
* Represents a dependency on the output of a shuffle stage.
5152
* @param rdd the parent RDD
5253
* @param partitioner partitioner used to partition the shuffle output
53-
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
54+
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
5455
* the default serializer, as specified by `spark.serializer` config option, will
5556
* be used.
5657
*/
5758
@DeveloperApi
58-
class ShuffleDependency[K, V](
59+
class ShuffleDependency[K, V, C](
5960
@transient rdd: RDD[_ <: Product2[K, V]],
6061
val partitioner: Partitioner,
61-
val serializer: Serializer = null)
62+
val serializer: Option[Serializer] = None,
63+
val keyOrdering: Option[Ordering[K]] = None,
64+
val aggregator: Option[Aggregator[K, V, C]] = None)
6265
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6366

6467
val shuffleId: Int = rdd.context.newShuffleId()
6568

69+
val shuffleHandle: ShuffleHandle = rdd.context.env.shuffleManager.registerShuffle(
70+
shuffleId, rdd.partitions.size, this)
71+
6672
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
6773
}
6874

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,9 +823,11 @@ class SparkContext(config: SparkConf) extends Logging {
823823
}
824824

825825
/**
826+
* :: DeveloperApi ::
826827
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
827828
* they take, etc.
828829
*/
830+
@DeveloperApi
829831
def getRDDStorageInfo: Array[RDDInfo] = {
830832
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
831833
}
@@ -837,8 +839,10 @@ class SparkContext(config: SparkConf) extends Logging {
837839
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
838840

839841
/**
842+
* :: DeveloperApi ::
840843
* Return information about blocks stored in all of the slaves
841844
*/
845+
@DeveloperApi
842846
def getExecutorStorageStatus: Array[StorageStatus] = {
843847
env.blockManager.master.getStorageStatus
844848
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.ConnectionManager
3535
import org.apache.spark.scheduler.LiveListenerBus
3636
import org.apache.spark.serializer.Serializer
37+
import org.apache.spark.shuffle.ShuffleManager
3738
import org.apache.spark.storage._
3839
import org.apache.spark.util.{AkkaUtils, Utils}
3940

@@ -56,7 +57,7 @@ class SparkEnv (
5657
val closureSerializer: Serializer,
5758
val cacheManager: CacheManager,
5859
val mapOutputTracker: MapOutputTracker,
59-
val shuffleFetcher: ShuffleFetcher,
60+
val shuffleManager: ShuffleManager,
6061
val broadcastManager: BroadcastManager,
6162
val blockManager: BlockManager,
6263
val connectionManager: ConnectionManager,
@@ -80,7 +81,7 @@ class SparkEnv (
8081
pythonWorkers.foreach { case(key, worker) => worker.stop() }
8182
httpFileServer.stop()
8283
mapOutputTracker.stop()
83-
shuffleFetcher.stop()
84+
shuffleManager.stop()
8485
broadcastManager.stop()
8586
blockManager.stop()
8687
blockManager.master.stop()
@@ -163,13 +164,20 @@ object SparkEnv extends Logging {
163164
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
164165
val name = conf.get(propertyName, defaultClassName)
165166
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
166-
// First try with the constructor that takes SparkConf. If we can't find one,
167-
// use a no-arg constructor instead.
167+
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
168+
// SparkConf, then one taking no arguments
168169
try {
169-
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
170+
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
171+
.newInstance(conf, new java.lang.Boolean(isDriver))
172+
.asInstanceOf[T]
170173
} catch {
171174
case _: NoSuchMethodException =>
172-
cls.getConstructor().newInstance().asInstanceOf[T]
175+
try {
176+
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
177+
} catch {
178+
case _: NoSuchMethodException =>
179+
cls.getConstructor().newInstance().asInstanceOf[T]
180+
}
173181
}
174182
}
175183

@@ -219,9 +227,6 @@ object SparkEnv extends Logging {
219227

220228
val cacheManager = new CacheManager(blockManager)
221229

222-
val shuffleFetcher = instantiateClass[ShuffleFetcher](
223-
"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")
224-
225230
val httpFileServer = new HttpFileServer(securityManager)
226231
httpFileServer.initialize()
227232
conf.set("spark.fileserver.uri", httpFileServer.serverUri)
@@ -242,6 +247,9 @@ object SparkEnv extends Logging {
242247
"."
243248
}
244249

250+
val shuffleManager = instantiateClass[ShuffleManager](
251+
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
252+
245253
// Warn about deprecated spark.cache.class property
246254
if (conf.contains("spark.cache.class")) {
247255
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
@@ -255,7 +263,7 @@ object SparkEnv extends Logging {
255263
closureSerializer,
256264
cacheManager,
257265
mapOutputTracker,
258-
shuffleFetcher,
266+
shuffleManager,
259267
broadcastManager,
260268
blockManager,
261269
connectionManager,

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,50 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
228228
: PartialResult[java.util.Map[K, BoundedDouble]] =
229229
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
230230

231+
/**
232+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
233+
* This function can return a different result type, U, than the type of the values in this RDD,
234+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
235+
* as in scala.TraversableOnce. The former operation is used for merging values within a
236+
* partition, and the latter is used for merging values between partitions. To avoid memory
237+
* allocation, both of these functions are allowed to modify and return their first argument
238+
* instead of creating a new U.
239+
*/
240+
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],
241+
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
242+
implicit val ctag: ClassTag[U] = fakeClassTag
243+
fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc))
244+
}
245+
246+
/**
247+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
248+
* This function can return a different result type, U, than the type of the values in this RDD,
249+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
250+
* as in scala.TraversableOnce. The former operation is used for merging values within a
251+
* partition, and the latter is used for merging values between partitions. To avoid memory
252+
* allocation, both of these functions are allowed to modify and return their first argument
253+
* instead of creating a new U.
254+
*/
255+
def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],
256+
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
257+
implicit val ctag: ClassTag[U] = fakeClassTag
258+
fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc))
259+
}
260+
261+
/**
262+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
263+
* This function can return a different result type, U, than the type of the values in this RDD,
264+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's.
265+
* The former operation is used for merging values within a partition, and the latter is used for
266+
* merging values between partitions. To avoid memory allocation, both of these functions are
267+
* allowed to modify and return their first argument instead of creating a new U.
268+
*/
269+
def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]):
270+
JavaPairRDD[K, U] = {
271+
implicit val ctag: ClassTag[U] = fakeClassTag
272+
fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
273+
}
274+
231275
/**
232276
* Merge the values for each key using an associative function and a neutral "zero value" which
233277
* may be added to the result an arbitrary number of times, and must not change the result

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
8181

8282
case "kill" =>
8383
val driverId = driverArgs.driverId
84-
val killFuture = masterActor ! RequestKillDriver(driverId)
84+
masterActor ! RequestKillDriver(driverId)
8585
}
8686
}
8787

core/src/main/scala/org/apache/spark/network/Connection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
210210
var nextMessageToBeUsed = 0
211211

212212
def addMessage(message: Message) {
213-
messages.synchronized{
213+
messages.synchronized {
214214
/* messages += message */
215215
messages.enqueue(message)
216216
logDebug("Added [" + message + "] to outbox for sending to " +
@@ -223,7 +223,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
223223
while (!messages.isEmpty) {
224224
/* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
225225
/* val message = messages(nextMessageToBeUsed) */
226-
val message = messages.dequeue
226+
val message = messages.dequeue()
227227
val chunk = message.getChunkForSending(defaultChunkSize)
228228
if (chunk.isDefined) {
229229
messages.enqueue(message)

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,14 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
250250
try {
251251
while(!selectorThread.isInterrupted) {
252252
while (! registerRequests.isEmpty) {
253-
val conn: SendingConnection = registerRequests.dequeue
253+
val conn: SendingConnection = registerRequests.dequeue()
254254
addListeners(conn)
255255
conn.connect()
256256
addConnection(conn)
257257
}
258258

259259
while(!keyInterestChangeRequests.isEmpty) {
260-
val (key, ops) = keyInterestChangeRequests.dequeue
260+
val (key, ops) = keyInterestChangeRequests.dequeue()
261261

262262
try {
263263
if (key.isValid) {
@@ -532,9 +532,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
532532
}
533533
return
534534
}
535-
var securityMsgResp = SecurityMessage.fromResponse(replyToken,
535+
val securityMsgResp = SecurityMessage.fromResponse(replyToken,
536536
securityMsg.getConnectionId.toString())
537-
var message = securityMsgResp.toBufferMessage
537+
val message = securityMsgResp.toBufferMessage
538538
if (message == null) throw new Exception("Error creating security message")
539539
sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
540540
} catch {
@@ -568,9 +568,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
568568
logDebug("Server sasl not completed: " + connection.connectionId)
569569
}
570570
if (replyToken != null) {
571-
var securityMsgResp = SecurityMessage.fromResponse(replyToken,
571+
val securityMsgResp = SecurityMessage.fromResponse(replyToken,
572572
securityMsg.getConnectionId)
573-
var message = securityMsgResp.toBufferMessage
573+
val message = securityMsgResp.toBufferMessage
574574
if (message == null) throw new Exception("Error creating security Message")
575575
sendSecurityMessage(connection.getRemoteConnectionManagerId(), message)
576576
}
@@ -618,7 +618,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
618618
return true
619619
}
620620
}
621-
return false
621+
false
622622
}
623623

624624
private def handleMessage(
@@ -694,9 +694,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
694694
var firstResponse: Array[Byte] = null
695695
try {
696696
firstResponse = conn.sparkSaslClient.firstToken()
697-
var securityMsg = SecurityMessage.fromResponse(firstResponse,
697+
val securityMsg = SecurityMessage.fromResponse(firstResponse,
698698
conn.connectionId.toString())
699-
var message = securityMsg.toBufferMessage
699+
val message = securityMsg.toBufferMessage
700700
if (message == null) throw new Exception("Error creating security message")
701701
connectionsAwaitingSasl += ((conn.connectionId, conn))
702702
sendSecurityMessage(connManagerId, message)

0 commit comments

Comments
 (0)