Skip to content

Commit 8045103

Browse files
committed
Breaking out into two tests
1 parent 2b4e085 commit 8045103

File tree

2 files changed

+31
-10
lines changed

2 files changed

+31
-10
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,15 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
4646
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
4747
val serializedSize = mapOutputStatuses.size
4848
if (serializedSize > maxAkkaFrameSize) {
49-
throw new SparkException(s"Map output statuses were $serializedSize bytes which " +
50-
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
49+
val msg = s"Map output statuses were $serializedSize bytes which " +
50+
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
51+
/**
52+
* For SPARK-1244 we'll opt for just logging an error and then throwing an exception.
53+
* Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239)
54+
* will utlimately remove this entire code path.
55+
*/
56+
logError(msg)
57+
throw new SparkException(msg)
5158
}
5259
sender ! mapOutputStatuses
5360

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
143143
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
144144
}
145145

146-
test("remote fetch exceeding akka frame size") {
146+
test("remote fetch exceeds akka frame size") {
147147
val newConf = new SparkConf
148148
newConf.set("spark.akka.frameSize", "1")
149149
newConf.set("spark.akka.askTimeout", "1") // Fail fast
@@ -154,18 +154,32 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
154154
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
155155
val masterActor = actorRef.underlyingActor
156156

157-
// Frame size should be ~123B, and no exception should be thrown
158-
masterTracker.registerShuffle(10, 1)
159-
masterTracker.registerMapOutput(10, 0, new MapStatus(
160-
BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
161-
masterActor.receive(GetMapOutputStatuses(10))
162-
163-
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception
157+
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
158+
// Note that the size is hand-selected here because map output statuses are compressed before
159+
// being sent.
164160
masterTracker.registerShuffle(20, 100)
165161
(0 until 100).foreach { i =>
166162
masterTracker.registerMapOutput(20, i, new MapStatus(
167163
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
168164
}
169165
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
170166
}
167+
168+
test("remote fetch below akka frame size") {
169+
val newConf = new SparkConf
170+
newConf.set("spark.akka.frameSize", "1")
171+
newConf.set("spark.akka.askTimeout", "1") // Fail fast
172+
173+
val masterTracker = new MapOutputTrackerMaster(conf)
174+
val actorSystem = ActorSystem("test")
175+
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
176+
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
177+
val masterActor = actorRef.underlyingActor
178+
179+
// Frame size should be ~123B, and no exception should be thrown
180+
masterTracker.registerShuffle(10, 1)
181+
masterTracker.registerMapOutput(10, 0, new MapStatus(
182+
BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
183+
masterActor.receive(GetMapOutputStatuses(10))
184+
}
171185
}

0 commit comments

Comments
 (0)