Skip to content

Commit c206fc5

Browse files
committed
minor cleanup
1 parent e59df41 commit c206fc5

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
378378
// simultaneously, and everything is still OK
379379

380380
def writeAndClose(
381-
writer: ShuffleWriter[Int, Int])(
382-
iter: Iterator[(Int, Int)]): Option[(Boolean, MapStatus)] = {
381+
writer: ShuffleWriter[Int, Int])(
382+
iter: Iterator[(Int, Int)]): Option[(Boolean, MapStatus)] = {
383383
val files = writer.write(iter)
384384
val output = writer.stop(true)
385385
output.map(ShuffleOutputCoordinator.commitOutputs(0, 0, files, _, SparkEnv.get))

core/src/test/scala/org/apache/spark/shuffle/ShuffleOutputCoordinatorSuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEac
3131
var tempDir: File = _
3232
var mapStatusFile: File = _
3333
// use the "port" as a way to distinguish mapstatuses, just for the test
34-
def mapStatus(id: Int): MapStatus = MapStatus(BlockManagerId("1", "a.b.c", id), Array(0L, 1L))
34+
def mapStatus(attemptId: Int): MapStatus = MapStatus(BlockManagerId("1", "a.b.c", attemptId), Array(0L, 1L))
3535
def ser: SerializerInstance = new JavaSerializer(new SparkConf()).newInstance()
3636

3737
override def beforeEach(): Unit = {
@@ -73,14 +73,15 @@ class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEac
7373
}
7474
}
7575

76-
private def commit(files: Seq[TmpDestShuffleFile], id: Int): (Boolean, MapStatus) = {
77-
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus(id), mapStatusFile, ser)
76+
private def commit(files: Seq[TmpDestShuffleFile], attemptId: Int): (Boolean, MapStatus) = {
77+
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus(attemptId), mapStatusFile, ser)
7878
}
7979

8080
test("move files if dest missing") {
8181
val firstAttempt = generateAttempt(0)
8282
val firstCommit = commit(firstAttempt, 1)
8383
assert(firstCommit._1)
84+
// "port" is just our holder for the attempt that succeeded in this test setup
8485
assert(firstCommit._2.location.port === 1)
8586
verifyFiles(0)
8687
firstAttempt.foreach{ case TmpDestShuffleFile(t, d) => assert(!t.exists())}
@@ -100,6 +101,7 @@ class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEac
100101
val firstAttempt = generateAttempt(0)
101102
val firstCommit = commit(firstAttempt, 1)
102103
assert(firstCommit._1)
104+
// "port" is just our holder for the attempt that succeeded in this test setup
103105
assert(firstCommit._2.location.port === 1)
104106
verifyFiles(0)
105107
firstAttempt.foreach{ case TmpDestShuffleFile(t, d) => assert(!t.exists())}

0 commit comments

Comments
 (0)