@@ -62,29 +62,35 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
6262 * The fail variable controls if we expect migration to fail, in which case we expect
6363 * a constant Long.MaxValue timestamp.
6464 */
65- private def validateWithMocks (conf : SparkConf , bm : BlockManager ,
65+ private def validateDecommissionTimestamps (conf : SparkConf , bm : BlockManager ,
6666 migratableShuffleBlockResolver : MigratableResolver , fail : Boolean = false ) = {
67- // Verify the decom manager handles this correctly
67+ // Verify the decommissioning manager timestamps and status
6868 val bmDecomManager = new BlockManagerDecommissioner (conf, bm)
69- var previousTime = Long . MaxValue
69+ var previousTime : Option [ Long ] = None
7070 try {
7171 bmDecomManager.start()
72- eventually(timeout(10 .second), interval(10 .milliseconds)) {
72+ eventually(timeout(100 .second), interval(10 .milliseconds)) {
7373 val (currentTime, done) = bmDecomManager.lastMigrationInfo()
7474 assert(done)
7575 // Make sure the time stamp starts moving forward.
76- if (! fail && previousTime > currentTime) {
77- previousTime = currentTime
78- assert(false )
79- } else if (fail) {
76+ if (! fail) {
77+ previousTime match {
78+ case None =>
79+ previousTime = Some (currentTime)
80+ assert(false )
81+ case Some (t) =>
82+ assert(t < currentTime)
83+ }
84+ } else {
85+ // If we expect migration to fail we should get the max value quickly.
8086 assert(currentTime === Long .MaxValue )
8187 }
8288 }
8389 if (! fail) {
8490 // Wait 5 seconds and assert times keep moving forward.
8591 Thread .sleep(5000 )
8692 val (currentTime, done) = bmDecomManager.lastMigrationInfo()
87- assert(done && currentTime > previousTime)
93+ assert(done && currentTime > previousTime.get )
8894 }
8995 } finally {
9096 bmDecomManager.stop()
@@ -103,9 +109,8 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
103109 when(bm.getPeers(mc.any()))
104110 .thenReturn(Seq (BlockManagerId (" exec2" , " host2" , 12345 )))
105111
106-
107112 // Verify the decom manager handles this correctly
108- validateWithMocks (sparkConf, bm, migratableShuffleBlockResolver)
113+ validateDecommissionTimestamps (sparkConf, bm, migratableShuffleBlockResolver)
109114 }
110115
111116 test(" block decom manager with no migrations configured" ) {
@@ -123,7 +128,8 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
123128 .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED , false )
124129 .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 10L )
125130 // Verify the decom manager handles this correctly
126- validateWithMocks(badConf, bm, migratableShuffleBlockResolver, fail = true )
131+ validateDecommissionTimestamps(badConf, bm, migratableShuffleBlockResolver,
132+ fail = true )
127133 }
128134
129135 test(" block decom manager with no peers" ) {
@@ -138,7 +144,8 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
138144 .thenReturn(Seq ())
139145
140146 // Verify the decom manager handles this correctly
141- validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver, fail = true )
147+ validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver,
148+ fail = true )
142149 }
143150
144151
@@ -154,7 +161,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
154161 .thenReturn(Seq (BlockManagerId (" exec2" , " host2" , 12345 )))
155162
156163 // Verify the decom manager handles this correctly
157- validateWithMocks (sparkConf, bm, migratableShuffleBlockResolver)
164+ validateDecommissionTimestamps (sparkConf, bm, migratableShuffleBlockResolver)
158165 }
159166
160167 test(" test shuffle and cached rdd migration without any error" ) {
0 commit comments