@@ -154,7 +154,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
154154 test(" Job should not complete if all commits are denied" ) {
155155 // Create a mock OutputCommitCoordinator that denies all attempts to commit
156156 doReturn(false ).when(outputCommitCoordinator).handleAskPermissionToCommit(
157- Matchers .any(), Matchers .any(), Matchers .any(), Matchers .any() )
157+ Matchers .any(), Matchers .any(), Matchers .any())
158158 val rdd : RDD [Int ] = sc.parallelize(Seq (1 ), 1 )
159159 def resultHandler (x : Int , y : Unit ): Unit = {}
160160 val futureAction : SimpleFutureAction [Unit ] = sc.submitJob[Int , Unit , Unit ](rdd,
@@ -170,73 +170,65 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
170170
171171 test(" Only authorized committer failures can clear the authorized committer lock (SPARK-6614)" ) {
172172 val stage : Int = 1
173- val stageAttempt : Int = 1
174173 val partition : Int = 2
175- val authorizedCommitter : Int = 3
176- val nonAuthorizedCommitter : Int = 100
174+ val authorizedCommitter : Long = 3
175+ val nonAuthorizedCommitter : Long = 100
177176 outputCommitCoordinator.stageStart(stage, maxPartitionId = 2 )
178177
179- assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter))
180- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
181- nonAuthorizedCommitter))
178+ assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
179+ assert(! outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
182180 // The non-authorized committer fails
183- outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition ,
184- attemptNumber = nonAuthorizedCommitter, reason = TaskKilled (" test" ))
181+ outputCommitCoordinator.taskCompleted(stage, partition, nonAuthorizedCommitter ,
182+ reason = TaskKilled (" test" ))
185183 // New tasks should still not be able to commit because the authorized committer has not failed
186- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
187- nonAuthorizedCommitter + 1 ))
184+ assert(! outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1 ))
188185 // The authorized committer now fails, clearing the lock
189- outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition ,
190- attemptNumber = authorizedCommitter, reason = TaskKilled (" test" ))
186+ outputCommitCoordinator.taskCompleted(stage, partition, authorizedCommitter ,
187+ reason = TaskKilled (" test" ))
191188 // A new task should now be allowed to become the authorized committer
192- assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
193- nonAuthorizedCommitter + 2 ))
189+ assert(outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2 ))
194190 // There can only be one authorized committer
195- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
196- nonAuthorizedCommitter + 3 ))
191+ assert(! outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3 ))
197192 }
198193
199194 test(" SPARK-19631: Do not allow failed attempts to be authorized for committing" ) {
200195 val stage : Int = 1
201- val stageAttempt : Int = 1
202196 val partition : Int = 1
203- val failedAttempt : Int = 0
197+ val failedAttempt : Long = 0L
204198 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
205- outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
206- attemptNumber = failedAttempt,
199+ outputCommitCoordinator.taskCompleted(stage, partition, failedAttempt,
207200 reason = ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
208- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt))
209- assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt + 1 ))
201+ assert(! outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
202+ assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1 ))
210203 }
211204
212205 test(" SPARK-24589: Differentiate tasks from different stage attempts" ) {
213206 var stage = 1
214- val taskAttempt = 1
215207 val partition = 1
216208
217209 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
218- assert(outputCommitCoordinator.canCommit(stage, 1 , partition, taskAttempt ))
219- assert(! outputCommitCoordinator.canCommit(stage, 2 , partition, taskAttempt ))
210+ assert(outputCommitCoordinator.canCommit(stage, partition, 1L ))
211+ assert(! outputCommitCoordinator.canCommit(stage, partition, 2L ))
220212
221213 // Fail the task in the first attempt, the task in the second attempt should succeed.
222214 stage += 1
223215 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
224- outputCommitCoordinator.taskCompleted(stage, 1 , partition, taskAttempt ,
216+ outputCommitCoordinator.taskCompleted(stage, partition, 1L ,
225217 ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
226- assert(! outputCommitCoordinator.canCommit(stage, 1 , partition, taskAttempt ))
227- assert(outputCommitCoordinator.canCommit(stage, 2 , partition, taskAttempt ))
218+ assert(! outputCommitCoordinator.canCommit(stage, partition, 1L ))
219+ assert(outputCommitCoordinator.canCommit(stage, partition, 2L ))
228220
229221 // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
230222 // then fail the 1st attempt and make sure the 4th one can commit again.
231223 stage += 1
232224 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
233- assert(outputCommitCoordinator.canCommit(stage, 1 , partition, taskAttempt ))
234- outputCommitCoordinator.taskCompleted(stage, 2 , partition, taskAttempt ,
225+ assert(outputCommitCoordinator.canCommit(stage, partition, 1L ))
226+ outputCommitCoordinator.taskCompleted(stage, partition, 2 ,
235227 ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
236- assert(! outputCommitCoordinator.canCommit(stage, 3 , partition, taskAttempt ))
237- outputCommitCoordinator.taskCompleted(stage, 1 , partition, taskAttempt ,
228+ assert(! outputCommitCoordinator.canCommit(stage, partition, 3L ))
229+ outputCommitCoordinator.taskCompleted(stage, partition, 1L ,
238230 ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
239- assert(outputCommitCoordinator.canCommit(stage, 4 , partition, taskAttempt ))
231+ assert(outputCommitCoordinator.canCommit(stage, partition, 4L ))
240232 }
241233
242234 test(" SPARK-24589: Make sure stage state is cleaned up" ) {
0 commit comments