File tree Expand file tree Collapse file tree 1 file changed +15
-14
lines changed
core/src/test/scala/org/apache/spark/storage Expand file tree Collapse file tree 1 file changed +15
-14
lines changed Original file line number Diff line number Diff line change @@ -83,24 +83,25 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
8383 val input = sc.parallelize(1 to numParts, numParts)
8484 val accum = sc.collectionAccumulator[String ](" mapperRunAccumulator" )
8585
86+ val sleepIntervalMs = whenToDecom match {
87+ // Increase the window of time b/w task started and ended so that we can decom within that.
88+ case TaskStarted => 2000
89+ // Make one task take a really short time so that we can decommission right after it is
90+ // done but before its peers are done.
91+ case TaskEnded =>
92+ if (TaskContext .getPartitionId() == 0 ) {
93+ 100
94+ } else {
95+ 1000
96+ }
97+ // No sleep otherwise
98+ case _ => 0
99+ }
100+
86101 // Create a new RDD where we have sleep in each partition, we are also increasing
87102 // the value of accumulator in each partition
88103 val baseRdd = input.mapPartitions { x =>
89104 accum.add(SparkEnv .get.executorId)
90- val sleepIntervalMs = whenToDecom match {
91- // Increase the window of time b/w task started and ended so that we can decom within that.
92- case TaskStarted => 2000
93- // Make one task take a really short time so that we can decommission right after it is
94- // done but before its peers are done.
95- case TaskEnded =>
96- if (TaskContext .getPartitionId() == 0 ) {
97- 100
98- } else {
99- 1000
100- }
101- // No sleep otherwise
102- case _ => 0
103- }
104105 if (sleepIntervalMs > 0 ) {
105106 Thread .sleep(sleepIntervalMs)
106107 }
You can’t perform that action at this time.
0 commit comments