Skip to content

Commit 0140d6e

Browse files
committed
Merge branch 'master' into netty-blockTransferService
2 parents a3a09f6 + de700d3 commit 0140d6e

30 files changed

+719
-449
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
349349
}
350350

351351
private[spark] object MapOutputTracker {
352-
private val LOG_BASE = 1.1
353352

354353
// Serialize an array of map output locations into an efficient byte format so that we can send
355354
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
385384
throw new MetadataFetchFailedException(
386385
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
387386
} else {
388-
(status.location, decompressSize(status.compressedSizes(reduceId)))
387+
(status.location, status.getSizeForBlock(reduceId))
389388
}
390389
}
391390
}
392-
393-
/**
394-
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
395-
* We do this by encoding the log base 1.1 of the size as an integer, which can support
396-
* sizes up to 35 GB with at most 10% error.
397-
*/
398-
def compressSize(size: Long): Byte = {
399-
if (size == 0) {
400-
0
401-
} else if (size <= 1L) {
402-
1
403-
} else {
404-
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
405-
}
406-
}
407-
408-
/**
409-
* Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
410-
*/
411-
def decompressSize(compressedSize: Byte): Long = {
412-
if (compressedSize == 0) {
413-
0
414-
} else {
415-
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
416-
}
417-
}
418391
}

core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,6 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
205205
val buffer = blockDataManager.getBlockData(blockId)
206206
logDebug("GetBlock " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs)
207207
+ " and got buffer " + buffer)
208-
buffer.nioByteBuffer()
208+
if (buffer == null) null else buffer.nioByteBuffer()
209209
}
210210
}

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@ package org.apache.spark.scheduler
2020
import java.io.{File, FileNotFoundException, IOException, PrintWriter}
2121
import java.text.SimpleDateFormat
2222
import java.util.{Date, Properties}
23-
import java.util.concurrent.LinkedBlockingQueue
2423

2524
import scala.collection.mutable.HashMap
2625

2726
import org.apache.spark._
2827
import org.apache.spark.annotation.DeveloperApi
29-
import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
30-
import org.apache.spark.rdd.RDD
31-
import org.apache.spark.storage.StorageLevel
28+
import org.apache.spark.executor.TaskMetrics
3229

3330
/**
3431
* :: DeveloperApi ::
@@ -62,24 +59,16 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
6259
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
6360
override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
6461
}
65-
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
6662

6763
createLogDir()
6864

69-
// The following 5 functions are used only in testing.
70-
private[scheduler] def getLogDir = logDir
71-
private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
72-
private[scheduler] def getStageIdToJobId = stageIdToJobId
73-
private[scheduler] def getJobIdToStageIds = jobIdToStageIds
74-
private[scheduler] def getEventQueue = eventQueue
75-
7665
/** Create a folder for log files, the folder's name is the creation time of jobLogger */
7766
protected def createLogDir() {
7867
val dir = new File(logDir + "/" + logDirName + "/")
7968
if (dir.exists()) {
8069
return
8170
}
82-
if (dir.mkdirs() == false) {
71+
if (!dir.mkdirs()) {
8372
// JobLogger should throw a exception rather than continue to construct this object.
8473
throw new IOException("create log directory error:" + logDir + "/" + logDirName + "/")
8574
}
@@ -261,7 +250,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
261250
protected def recordJobProperties(jobId: Int, properties: Properties) {
262251
if (properties != null) {
263252
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
264-
jobLogInfo(jobId, description, false)
253+
jobLogInfo(jobId, description, withTime = false)
265254
}
266255
}
267256

core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala

Lines changed: 110 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
2424
/**
2525
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
2626
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
27-
* The map output sizes are compressed using MapOutputTracker.compressSize.
2827
*/
29-
private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
30-
extends Externalizable {
28+
private[spark] sealed trait MapStatus {
29+
/** Location where this task was run. */
30+
def location: BlockManagerId
3131

32-
def this() = this(null, null) // For deserialization only
32+
/** Estimated size for the reduce block, in bytes. */
33+
def getSizeForBlock(reduceId: Int): Long
34+
}
35+
36+
37+
private[spark] object MapStatus {
38+
39+
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
40+
if (uncompressedSizes.length > 2000) {
41+
new HighlyCompressedMapStatus(loc, uncompressedSizes)
42+
} else {
43+
new CompressedMapStatus(loc, uncompressedSizes)
44+
}
45+
}
46+
47+
private[this] val LOG_BASE = 1.1
48+
49+
/**
50+
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
51+
* We do this by encoding the log base 1.1 of the size as an integer, which can support
52+
* sizes up to 35 GB with at most 10% error.
53+
*/
54+
def compressSize(size: Long): Byte = {
55+
if (size == 0) {
56+
0
57+
} else if (size <= 1L) {
58+
1
59+
} else {
60+
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
61+
}
62+
}
63+
64+
/**
65+
* Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
66+
*/
67+
def decompressSize(compressedSize: Byte): Long = {
68+
if (compressedSize == 0) {
69+
0
70+
} else {
71+
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
72+
}
73+
}
74+
}
75+
76+
77+
/**
78+
* A [[MapStatus]] implementation that tracks the size of each block. Size for each block is
79+
* represented using a single byte.
80+
*
81+
* @param loc location where the task is being executed.
82+
* @param compressedSizes size of the blocks, indexed by reduce partition id.
83+
*/
84+
private[spark] class CompressedMapStatus(
85+
private[this] var loc: BlockManagerId,
86+
private[this] var compressedSizes: Array[Byte])
87+
extends MapStatus with Externalizable {
88+
89+
protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
90+
91+
def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
92+
this(loc, uncompressedSizes.map(MapStatus.compressSize))
93+
}
3394

34-
def writeExternal(out: ObjectOutput) {
35-
location.writeExternal(out)
95+
override def location: BlockManagerId = loc
96+
97+
override def getSizeForBlock(reduceId: Int): Long = {
98+
MapStatus.decompressSize(compressedSizes(reduceId))
99+
}
100+
101+
override def writeExternal(out: ObjectOutput): Unit = {
102+
loc.writeExternal(out)
36103
out.writeInt(compressedSizes.length)
37104
out.write(compressedSizes)
38105
}
39106

40-
def readExternal(in: ObjectInput) {
41-
location = BlockManagerId(in)
42-
compressedSizes = new Array[Byte](in.readInt())
107+
override def readExternal(in: ObjectInput): Unit = {
108+
loc = BlockManagerId(in)
109+
val len = in.readInt()
110+
compressedSizes = new Array[Byte](len)
43111
in.readFully(compressedSizes)
44112
}
45113
}
114+
115+
116+
/**
117+
* A [[MapStatus]] implementation that only stores the average size of the blocks.
118+
*
119+
* @param loc location where the task is being executed.
120+
* @param avgSize average size of all the blocks
121+
*/
122+
private[spark] class HighlyCompressedMapStatus(
123+
private[this] var loc: BlockManagerId,
124+
private[this] var avgSize: Long)
125+
extends MapStatus with Externalizable {
126+
127+
def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
128+
this(loc, uncompressedSizes.sum / uncompressedSizes.length)
129+
}
130+
131+
protected def this() = this(null, 0L) // For deserialization only
132+
133+
override def location: BlockManagerId = loc
134+
135+
override def getSizeForBlock(reduceId: Int): Long = avgSize
136+
137+
override def writeExternal(out: ObjectOutput): Unit = {
138+
loc.writeExternal(out)
139+
out.writeLong(avgSize)
140+
}
141+
142+
override def readExternal(in: ObjectInput): Unit = {
143+
loc = BlockManagerId(in)
144+
avgSize = in.readLong()
145+
}
146+
}

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V](
103103

104104
private def commitWritesAndBuildStatus(): MapStatus = {
105105
// Commit the writes. Get the size of each bucket block (total block size).
106-
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
106+
val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter =>
107107
writer.commitAndClose()
108-
val size = writer.fileSegment().length
109-
MapOutputTracker.compressSize(size)
108+
writer.fileSegment().length
110109
}
111-
112-
new MapStatus(blockManager.blockManagerId, compressedSizes)
110+
MapStatus(blockManager.blockManagerId, sizes)
113111
}
114112

115113
private def revertWrites(): Unit = {

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
7070
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
7171
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
7272

73-
mapStatus = new MapStatus(blockManager.blockManagerId,
74-
partitionLengths.map(MapOutputTracker.compressSize))
73+
mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
7574
}
7675

7776
/** Close this writer, passing along whether the map completed */

core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ class BlockManagerSlaveActor(
5858
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
5959
}
6060

61-
case RemoveBroadcast(broadcastId, tellMaster) =>
61+
case RemoveBroadcast(broadcastId, _) =>
6262
doAsync[Int]("removing broadcast " + broadcastId, sender) {
63-
blockManager.removeBroadcast(broadcastId, tellMaster)
63+
blockManager.removeBroadcast(broadcastId, tellMaster = true)
6464
}
6565

6666
case GetBlockStatus(blockId, _) =>

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import scala.collection.Map
2525
import org.json4s.DefaultFormats
2626
import org.json4s.JsonDSL._
2727
import org.json4s.JsonAST._
28-
import org.json4s.jackson.JsonMethods._
2928

3029

3130
import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,

0 commit comments

Comments
 (0)