Skip to content

Commit 47f3510

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2894
2 parents 2c899ed + b431e67 commit 47f3510

File tree

28 files changed

+228
-85
lines changed

28 files changed

+228
-85
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
20+
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21+
ObjectInputStream, ObjectOutputStream, OutputStream}
2122

2223
import scala.reflect.ClassTag
2324
import scala.util.Random
2425

2526
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
27+
import org.apache.spark.io.CompressionCodec
2628
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
27-
import org.apache.spark.util.Utils
2829

2930
/**
3031
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
@@ -214,11 +215,15 @@ private[broadcast] object TorrentBroadcast extends Logging {
214215
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
215216
private var initialized = false
216217
private var conf: SparkConf = null
218+
private var compress: Boolean = false
219+
private var compressionCodec: CompressionCodec = null
217220

218221
def initialize(_isDriver: Boolean, conf: SparkConf) {
219222
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
220223
synchronized {
221224
if (!initialized) {
225+
compress = conf.getBoolean("spark.broadcast.compress", true)
226+
compressionCodec = CompressionCodec.createCodec(conf)
222227
initialized = true
223228
}
224229
}
@@ -228,8 +233,13 @@ private[broadcast] object TorrentBroadcast extends Logging {
228233
initialized = false
229234
}
230235

231-
def blockifyObject[T](obj: T): TorrentInfo = {
232-
val byteArray = Utils.serialize[T](obj)
236+
def blockifyObject[T: ClassTag](obj: T): TorrentInfo = {
237+
val bos = new ByteArrayOutputStream()
238+
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
239+
val ser = SparkEnv.get.serializer.newInstance()
240+
val serOut = ser.serializeStream(out)
241+
serOut.writeObject[T](obj).close()
242+
val byteArray = bos.toByteArray
233243
val bais = new ByteArrayInputStream(byteArray)
234244

235245
var blockNum = byteArray.length / BLOCK_SIZE
@@ -255,7 +265,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
255265
info
256266
}
257267

258-
def unBlockifyObject[T](
268+
def unBlockifyObject[T: ClassTag](
259269
arrayOfBlocks: Array[TorrentBlock],
260270
totalBytes: Int,
261271
totalBlocks: Int): T = {
@@ -264,7 +274,16 @@ private[broadcast] object TorrentBroadcast extends Logging {
264274
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
265275
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
266276
}
267-
Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader)
277+
278+
val in: InputStream = {
279+
val arrIn = new ByteArrayInputStream(retByteArray)
280+
if (compress) compressionCodec.compressedInputStream(arrIn) else arrIn
281+
}
282+
val ser = SparkEnv.get.serializer.newInstance()
283+
val serIn = ser.deserializeStream(in)
284+
val obj = serIn.readObject[T]()
285+
serIn.close()
286+
obj
268287
}
269288

270289
/**

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private[spark] class Worker(
136136
logInfo("Spark home: " + sparkHome)
137137
createWorkDir()
138138
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
139-
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
139+
webUi = new WorkerWebUI(this, workDir, webUiPort)
140140
webUi.bind()
141141
registerWithMaster()
142142

@@ -373,7 +373,8 @@ private[spark] class Worker(
373373
private[spark] object Worker extends Logging {
374374
def main(argStrings: Array[String]) {
375375
SignalLogger.register(log)
376-
val args = new WorkerArguments(argStrings)
376+
val conf = new SparkConf
377+
val args = new WorkerArguments(argStrings, conf)
377378
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
378379
args.memory, args.masters, args.workDir)
379380
actorSystem.awaitTermination()

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package org.apache.spark.deploy.worker
2020
import java.lang.management.ManagementFactory
2121

2222
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
23+
import org.apache.spark.SparkConf
2324

2425
/**
2526
* Command-line parser for the worker.
2627
*/
27-
private[spark] class WorkerArguments(args: Array[String]) {
28+
private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
2829
var host = Utils.localHostName()
2930
var port = 0
3031
var webUiPort = 8081
@@ -46,6 +47,9 @@ private[spark] class WorkerArguments(args: Array[String]) {
4647
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
4748
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
4849
}
50+
if (conf.contains("spark.worker.ui.port")) {
51+
webUiPort = conf.get("spark.worker.ui.port").toInt
52+
}
4953
if (System.getenv("SPARK_WORKER_DIR") != null) {
5054
workDir = System.getenv("SPARK_WORKER_DIR")
5155
}

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ private[spark]
3434
class WorkerWebUI(
3535
val worker: Worker,
3636
val workDir: File,
37-
port: Option[Int] = None)
38-
extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
37+
requestedPort: Int)
38+
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
3939
with Logging {
4040

4141
val timeout = AkkaUtils.askTimeout(worker.conf)
@@ -55,10 +55,5 @@ class WorkerWebUI(
5555
}
5656

5757
private[spark] object WorkerWebUI {
58-
val DEFAULT_PORT = 8081
5958
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
60-
61-
def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
62-
requestedPort.getOrElse(conf.getInt("spark.worker.ui.port", WorkerWebUI.DEFAULT_PORT))
63-
}
6459
}

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
119119

120120
/**
121121
* Compute a histogram using the provided buckets. The buckets are all open
122-
* to the left except for the last which is closed
122+
* to the right except for the last which is closed
123123
* e.g. for the array
124124
* [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50]
125-
* e.g 1<=x<10 , 10<=x<20, 20<=x<50
126-
* And on the input of 1 and 50 we would have a histogram of 1, 0, 0
125+
* e.g 1<=x<10 , 10<=x<20, 20<=x<=50
126+
* And on the input of 1 and 50 we would have a histogram of 1, 0, 1
127127
*
128128
* Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
129129
* from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag](
12331233
dependencies.head.rdd.asInstanceOf[RDD[U]]
12341234
}
12351235

1236+
/** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
1237+
protected[spark] def parent[U: ClassTag](j: Int) = {
1238+
dependencies(j).rdd.asInstanceOf[RDD[U]]
1239+
}
1240+
12361241
/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
12371242
def context = sc
12381243

core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ class UnionRDD[T: ClassTag](
8383

8484
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
8585
val part = s.asInstanceOf[UnionPartition[T]]
86-
val parentRdd = dependencies(part.parentRddIndex).rdd.asInstanceOf[RDD[T]]
87-
parentRdd.iterator(part.parentPartition, context)
86+
parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
8887
}
8988

9089
override def getPreferredLocations(s: Partition): Seq[String] =

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4747
{
4848
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4949
var totalCoreCount = new AtomicInteger(0)
50-
var totalExpectedExecutors = new AtomicInteger(0)
50+
var totalRegisteredExecutors = new AtomicInteger(0)
5151
val conf = scheduler.sc.conf
5252
private val timeout = AkkaUtils.askTimeout(conf)
5353
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
54-
// Submit tasks only after (registered executors / total expected executors)
54+
// Submit tasks only after (registered resources / total expected resources)
5555
// is equal to at least this value, that is double between 0 and 1.
56-
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
57-
if (minRegisteredRatio > 1) minRegisteredRatio = 1
58-
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
56+
var minRegisteredRatio =
57+
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
58+
// Submit tasks after maxRegisteredWaitingTime milliseconds
59+
// if minRegisteredRatio has not yet been reached
5960
val maxRegisteredWaitingTime =
60-
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
61+
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
6162
val createTime = System.currentTimeMillis()
62-
var ready = if (minRegisteredRatio <= 0) true else false
6363

6464
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
6565
private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
9494
executorAddress(executorId) = sender.path.address
9595
addressToExecutorId(sender.path.address) = executorId
9696
totalCoreCount.addAndGet(cores)
97-
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
98-
ready = true
99-
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
100-
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
101-
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
102-
}
97+
totalRegisteredExecutors.addAndGet(1)
10398
makeOffers()
10499
}
105100

@@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
268263
}
269264
}
270265

266+
def sufficientResourcesRegistered(): Boolean = true
267+
271268
override def isReady(): Boolean = {
272-
if (ready) {
269+
if (sufficientResourcesRegistered) {
270+
logInfo("SchedulerBackend is ready for scheduling beginning after " +
271+
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
273272
return true
274273
}
275274
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
276-
ready = true
277275
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
278-
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
276+
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
279277
return true
280278
}
281279
false

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
3636
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
3737

3838
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
39+
val totalExpectedCores = maxCores.getOrElse(0)
3940

4041
override def start() {
4142
super.start()
@@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
9798

9899
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
99100
memory: Int) {
100-
totalExpectedExecutors.addAndGet(1)
101101
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
102102
fullId, hostPort, cores, Utils.megabytesToString(memory)))
103103
}
@@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
110110
logInfo("Executor %s removed: %s".format(fullId, message))
111111
removeExecutor(fullId.split("/")(1), reason.toString)
112112
}
113+
114+
override def sufficientResourcesRegistered(): Boolean = {
115+
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
116+
}
113117
}

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
4444

4545
test("Accessing HttpBroadcast variables in a local cluster") {
4646
val numSlaves = 4
47-
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", httpConf)
47+
val conf = httpConf.clone
48+
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
49+
conf.set("spark.broadcast.compress", "true")
50+
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
4851
val list = List[Int](1, 2, 3, 4)
4952
val broadcast = sc.broadcast(list)
5053
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
@@ -69,7 +72,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
6972

7073
test("Accessing TorrentBroadcast variables in a local cluster") {
7174
val numSlaves = 4
72-
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", torrentConf)
75+
val conf = torrentConf.clone
76+
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
77+
conf.set("spark.broadcast.compress", "true")
78+
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
7379
val list = List[Int](1, 2, 3, 4)
7480
val broadcast = sc.broadcast(list)
7581
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))

0 commit comments

Comments
 (0)