Skip to content

Commit ec8466a

Browse files
committed
Address comment by kiszk
1 parent ecf12bd commit ec8466a

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

core/src/main/scala/org/apache/spark/BarrierCoordinator.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private[spark] class BarrierCoordinator(
101101
// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used
102102
// to identify each barrier() call. It shall get increased when a barrier() call succeeds, or
103103
// reset when a barrier() call fails due to timeout.
104-
private[spark] var barrierEpoch: Int = 0
104+
private var barrierEpoch: Int = 0
105105

106106
// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier()
107107
// call.
@@ -194,6 +194,9 @@ private[spark] class BarrierCoordinator(
194194

195195
// Check for clearing internal data, visible for test only.
196196
private[spark] def cleanCheck(): Boolean = requesters.isEmpty && timerTask == null
197+
198+
// Get currently barrier epoch, visible for test only.
199+
private[spark] def getBarrierEpoch(): Int = barrierEpoch
197200
}
198201

199202
// Clean up the [[ContextBarrierState]] that correspond to a specific stage attempt.

core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext with
5353
eventually(timeout(10.seconds)) {
5454
// Ensure barrierEpoch value have been changed.
5555
val barrierState = getBarrierState(stageId, stageAttemptNumber, barrierCoordinator)
56-
assert(barrierState.barrierEpoch == 1)
56+
assert(barrierState.getBarrierEpoch() == 1)
5757
assert(barrierState.cleanCheck())
5858
}
5959
}
@@ -81,7 +81,7 @@ class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext with
8181
eventually(timeout(10.seconds)) {
8282
// Ensure barrierEpoch value have been changed.
8383
val barrierState = getBarrierState(stageId, stageAttemptNumber, barrierCoordinator)
84-
assert(barrierState.barrierEpoch == 1)
84+
assert(barrierState.getBarrierEpoch() == 1)
8585
assert(barrierState.cleanCheck())
8686
}
8787
}
@@ -124,7 +124,7 @@ class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext with
124124
eventually(timeout(10.seconds)) {
125125
// Ensure barrierEpoch value have been changed.
126126
val barrierState = getBarrierState(stageId, stageAttemptNumber, barrierCoordinator)
127-
assert(barrierState.barrierEpoch == 1)
127+
assert(barrierState.getBarrierEpoch() == 1)
128128
assert(barrierState.cleanCheck())
129129
}
130130
intercept[SparkException](

0 commit comments

Comments
 (0)