Skip to content

Commit 89671a2

Browse files
viiryacloud-fan
authored andcommitted
## What changes were proposed in this pull request? This goes to revert sequential PRs based on some discussion and comments at #16677 (comment). #22344 #22330 #22239 #16677 ## How was this patch tested? Existing tests. Closes #22481 from viirya/revert-SPARK-19355-1. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 7ff5386 commit 89671a2

File tree

30 files changed

+184
-504
lines changed

30 files changed

+184
-504
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
125125
if (!records.hasNext()) {
126126
partitionLengths = new long[numPartitions];
127127
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
128-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, 0);
128+
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
129129
return;
130130
}
131131
final SerializerInstance serInstance = serializer.newInstance();
@@ -167,8 +167,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
167167
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
168168
}
169169
}
170-
mapStatus = MapStatus$.MODULE$.apply(
171-
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
170+
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
172171
}
173172

174173
@VisibleForTesting

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ void closeAndWriteOutput() throws IOException {
248248
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
249249
}
250250
}
251-
mapStatus = MapStatus$.MODULE$.apply(
252-
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
251+
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
253252
}
254253

255254
@VisibleForTesting

core/src/main/scala/org/apache/spark/MapOutputStatistics.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,5 @@ package org.apache.spark
2323
* @param shuffleId ID of the shuffle
2424
* @param bytesByPartitionId approximate number of output bytes for each map output partition
2525
* (may be inexact due to use of compressed map statuses)
26-
* @param recordsByPartitionId number of output records for each map output partition
2726
*/
28-
private[spark] class MapOutputStatistics(
29-
val shuffleId: Int,
30-
val bytesByPartitionId: Array[Long],
31-
val recordsByPartitionId: Array[Long])
27+
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -522,19 +522,16 @@ private[spark] class MapOutputTrackerMaster(
522522
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
523523
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
524524
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
525-
val recordsByMapTask = new Array[Long](statuses.length)
526-
527525
val parallelAggThreshold = conf.get(
528526
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
529527
val parallelism = math.min(
530528
Runtime.getRuntime.availableProcessors(),
531529
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
532530
if (parallelism <= 1) {
533-
statuses.zipWithIndex.foreach { case (s, index) =>
531+
for (s <- statuses) {
534532
for (i <- 0 until totalSizes.length) {
535533
totalSizes(i) += s.getSizeForBlock(i)
536534
}
537-
recordsByMapTask(index) = s.numberOfOutput
538535
}
539536
} else {
540537
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
@@ -551,11 +548,8 @@ private[spark] class MapOutputTrackerMaster(
551548
} finally {
552549
threadPool.shutdown()
553550
}
554-
statuses.zipWithIndex.foreach { case (s, index) =>
555-
recordsByMapTask(index) = s.numberOfOutput
556-
}
557551
}
558-
new MapOutputStatistics(dep.shuffleId, totalSizes, recordsByMapTask)
552+
new MapOutputStatistics(dep.shuffleId, totalSizes)
559553
}
560554
}
561555

core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ import org.apache.spark.util.Utils
3131

3232
/**
3333
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
34-
* task ran on, the sizes of outputs for each reducer, and the number of outputs of the map task,
35-
* for passing on to the reduce tasks.
34+
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
3635
*/
3736
private[spark] sealed trait MapStatus {
3837
/** Location where this task was run. */
@@ -45,23 +44,18 @@ private[spark] sealed trait MapStatus {
4544
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
4645
*/
4746
def getSizeForBlock(reduceId: Int): Long
48-
49-
/**
50-
* The number of outputs for the map task.
51-
*/
52-
def numberOfOutput: Long
5347
}
5448

5549

5650
private[spark] object MapStatus {
5751

58-
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long): MapStatus = {
52+
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
5953
if (uncompressedSizes.length > Option(SparkEnv.get)
6054
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
6155
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
62-
HighlyCompressedMapStatus(loc, uncompressedSizes, numOutput)
56+
HighlyCompressedMapStatus(loc, uncompressedSizes)
6357
} else {
64-
new CompressedMapStatus(loc, uncompressedSizes, numOutput)
58+
new CompressedMapStatus(loc, uncompressedSizes)
6559
}
6660
}
6761

@@ -104,34 +98,29 @@ private[spark] object MapStatus {
10498
*/
10599
private[spark] class CompressedMapStatus(
106100
private[this] var loc: BlockManagerId,
107-
private[this] var compressedSizes: Array[Byte],
108-
private[this] var numOutput: Long)
101+
private[this] var compressedSizes: Array[Byte])
109102
extends MapStatus with Externalizable {
110103

111-
protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1) // For deserialization only
104+
protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
112105

113-
def this(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long) {
114-
this(loc, uncompressedSizes.map(MapStatus.compressSize), numOutput)
106+
def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
107+
this(loc, uncompressedSizes.map(MapStatus.compressSize))
115108
}
116109

117110
override def location: BlockManagerId = loc
118111

119-
override def numberOfOutput: Long = numOutput
120-
121112
override def getSizeForBlock(reduceId: Int): Long = {
122113
MapStatus.decompressSize(compressedSizes(reduceId))
123114
}
124115

125116
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
126117
loc.writeExternal(out)
127-
out.writeLong(numOutput)
128118
out.writeInt(compressedSizes.length)
129119
out.write(compressedSizes)
130120
}
131121

132122
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
133123
loc = BlockManagerId(in)
134-
numOutput = in.readLong()
135124
val len = in.readInt()
136125
compressedSizes = new Array[Byte](len)
137126
in.readFully(compressedSizes)
@@ -154,20 +143,17 @@ private[spark] class HighlyCompressedMapStatus private (
154143
private[this] var numNonEmptyBlocks: Int,
155144
private[this] var emptyBlocks: RoaringBitmap,
156145
private[this] var avgSize: Long,
157-
private var hugeBlockSizes: Map[Int, Byte],
158-
private[this] var numOutput: Long)
146+
private var hugeBlockSizes: Map[Int, Byte])
159147
extends MapStatus with Externalizable {
160148

161149
// loc could be null when the default constructor is called during deserialization
162150
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
163151
"Average size can only be zero for map stages that produced no output")
164152

165-
protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only
153+
protected def this() = this(null, -1, null, -1, null) // For deserialization only
166154

167155
override def location: BlockManagerId = loc
168156

169-
override def numberOfOutput: Long = numOutput
170-
171157
override def getSizeForBlock(reduceId: Int): Long = {
172158
assert(hugeBlockSizes != null)
173159
if (emptyBlocks.contains(reduceId)) {
@@ -182,7 +168,6 @@ private[spark] class HighlyCompressedMapStatus private (
182168

183169
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
184170
loc.writeExternal(out)
185-
out.writeLong(numOutput)
186171
emptyBlocks.writeExternal(out)
187172
out.writeLong(avgSize)
188173
out.writeInt(hugeBlockSizes.size)
@@ -194,7 +179,6 @@ private[spark] class HighlyCompressedMapStatus private (
194179

195180
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
196181
loc = BlockManagerId(in)
197-
numOutput = in.readLong()
198182
emptyBlocks = new RoaringBitmap()
199183
emptyBlocks.readExternal(in)
200184
avgSize = in.readLong()
@@ -210,10 +194,7 @@ private[spark] class HighlyCompressedMapStatus private (
210194
}
211195

212196
private[spark] object HighlyCompressedMapStatus {
213-
def apply(
214-
loc: BlockManagerId,
215-
uncompressedSizes: Array[Long],
216-
numOutput: Long): HighlyCompressedMapStatus = {
197+
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
217198
// We must keep track of which blocks are empty so that we don't report a zero-sized
218199
// block as being non-empty (or vice-versa) when using the average block size.
219200
var i = 0
@@ -254,6 +235,6 @@ private[spark] object HighlyCompressedMapStatus {
254235
emptyBlocks.trim()
255236
emptyBlocks.runOptimize()
256237
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
257-
hugeBlockSizesArray.toMap, numOutput)
238+
hugeBlockSizesArray.toMap)
258239
}
259240
}

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
7070
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
7171
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
7272
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
73-
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,
74-
writeMetrics.recordsWritten)
73+
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
7574
} finally {
7675
if (tmp.exists() && !tmp.delete()) {
7776
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")

core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ public void writeEmptyIterator() throws Exception {
233233
writer.write(Iterators.emptyIterator());
234234
final Option<MapStatus> mapStatus = writer.stop(true);
235235
assertTrue(mapStatus.isDefined());
236-
assertEquals(0, mapStatus.get().numberOfOutput());
237236
assertTrue(mergedOutputFile.exists());
238237
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
239238
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
@@ -253,7 +252,6 @@ public void writeWithoutSpilling() throws Exception {
253252
writer.write(dataToWrite.iterator());
254253
final Option<MapStatus> mapStatus = writer.stop(true);
255254
assertTrue(mapStatus.isDefined());
256-
assertEquals(NUM_PARTITITONS, mapStatus.get().numberOfOutput());
257255
assertTrue(mergedOutputFile.exists());
258256

259257
long sumOfPartitionSizes = 0;

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
6262
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
6363
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
6464
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
65-
Array(1000L, 10000L), 10))
65+
Array(1000L, 10000L)))
6666
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
67-
Array(10000L, 1000L), 10))
67+
Array(10000L, 1000L)))
6868
val statuses = tracker.getMapSizesByExecutorId(10, 0)
6969
assert(statuses.toSet ===
7070
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))),
@@ -84,9 +84,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
8484
val compressedSize1000 = MapStatus.compressSize(1000L)
8585
val compressedSize10000 = MapStatus.compressSize(10000L)
8686
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
87-
Array(compressedSize1000, compressedSize10000), 10))
87+
Array(compressedSize1000, compressedSize10000)))
8888
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
89-
Array(compressedSize10000, compressedSize1000), 10))
89+
Array(compressedSize10000, compressedSize1000)))
9090
assert(tracker.containsShuffle(10))
9191
assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty)
9292
assert(0 == tracker.getNumCachedSerializedBroadcast)
@@ -107,9 +107,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
107107
val compressedSize1000 = MapStatus.compressSize(1000L)
108108
val compressedSize10000 = MapStatus.compressSize(10000L)
109109
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
110-
Array(compressedSize1000, compressedSize1000, compressedSize1000), 10))
110+
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
111111
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
112-
Array(compressedSize10000, compressedSize1000, compressedSize1000), 10))
112+
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
113113

114114
assert(0 == tracker.getNumCachedSerializedBroadcast)
115115
// As if we had two simultaneous fetch failures
@@ -145,7 +145,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
145145

146146
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
147147
masterTracker.registerMapOutput(10, 0, MapStatus(
148-
BlockManagerId("a", "hostA", 1000), Array(1000L), 10))
148+
BlockManagerId("a", "hostA", 1000), Array(1000L)))
149149
slaveTracker.updateEpoch(masterTracker.getEpoch)
150150
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
151151
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
@@ -182,7 +182,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
182182
// Message size should be ~123B, and no exception should be thrown
183183
masterTracker.registerShuffle(10, 1)
184184
masterTracker.registerMapOutput(10, 0, MapStatus(
185-
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 0))
185+
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
186186
val senderAddress = RpcAddress("localhost", 12345)
187187
val rpcCallContext = mock(classOf[RpcCallContext])
188188
when(rpcCallContext.senderAddress).thenReturn(senderAddress)
@@ -216,11 +216,11 @@ class MapOutputTrackerSuite extends SparkFunSuite {
216216
// on hostB with output size 3
217217
tracker.registerShuffle(10, 3)
218218
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
219-
Array(2L), 1))
219+
Array(2L)))
220220
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
221-
Array(2L), 1))
221+
Array(2L)))
222222
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
223-
Array(3L), 1))
223+
Array(3L)))
224224

225225
// When the threshold is 50%, only host A should be returned as a preferred location
226226
// as it has 4 out of 7 bytes of output.
@@ -260,7 +260,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
260260
masterTracker.registerShuffle(20, 100)
261261
(0 until 100).foreach { i =>
262262
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
263-
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 0))
263+
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
264264
}
265265
val senderAddress = RpcAddress("localhost", 12345)
266266
val rpcCallContext = mock(classOf[RpcCallContext])
@@ -309,9 +309,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
309309
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
310310
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
311311
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
312-
Array(size0, size1000, size0, size10000), 1))
312+
Array(size0, size1000, size0, size10000)))
313313
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
314-
Array(size10000, size0, size1000, size0), 1))
314+
Array(size10000, size0, size1000, size0)))
315315
assert(tracker.containsShuffle(10))
316316
assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
317317
Seq(

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
391391
assert(mapOutput2.isDefined)
392392
assert(mapOutput1.get.location === mapOutput2.get.location)
393393
assert(mapOutput1.get.getSizeForBlock(0) === mapOutput1.get.getSizeForBlock(0))
394-
assert(mapOutput1.get.numberOfOutput === mapOutput2.get.numberOfOutput)
395394

396395
// register one of the map outputs -- doesn't matter which one
397396
mapOutput1.foreach { case mapStatus =>

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -445,17 +445,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
445445
// map stage1 completes successfully, with one task on each executor
446446
complete(taskSets(0), Seq(
447447
(Success,
448-
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
448+
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
449449
(Success,
450-
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
450+
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
451451
(Success, makeMapStatus("hostB", 1))
452452
))
453453
// map stage2 completes successfully, with one task on each executor
454454
complete(taskSets(1), Seq(
455455
(Success,
456-
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
456+
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
457457
(Success,
458-
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
458+
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
459459
(Success, makeMapStatus("hostB", 1))
460460
))
461461
// make sure our test setup is correct
@@ -2857,7 +2857,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
28572857

28582858
object DAGSchedulerSuite {
28592859
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
2860-
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 1)
2860+
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
28612861

28622862
def makeBlockManagerId(host: String): BlockManagerId =
28632863
BlockManagerId("exec-" + host, host, 12345)

0 commit comments

Comments
 (0)