Skip to content

Commit bdf4d69

Browse files
committed
Remove some un-needed semaphores
Now that we wait for an actual task to succeed, we don't need to wait for events prior to that: broadcast of job-info finished and task started. The waiting for the task end/success subsumes that. Simplifying the test even further.
1 parent c3e3932 commit bdf4d69

File tree

1 file changed

+0
-16
lines changed

1 file changed

+0
-16
lines changed

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
8888
}
8989

9090
// Listen for the job & block updates
91-
val taskStartSem = new Semaphore(0)
92-
val broadcastSem = new Semaphore(0)
9391
val executorRemovedSem = new Semaphore(0)
9492
val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
9593
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
@@ -102,20 +100,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
102100
executorRemovedSem.release()
103101
}
104102

105-
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
106-
taskStartSem.release()
107-
}
108-
109103
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
110104
taskEndEvents.add(taskEnd)
111105
}
112106

113107
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
114-
// Once broadcast start landing on the executors we're good to proceed.
115-
// We don't only use task start as it can occur before the work is on the executor.
116-
if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
117-
broadcastSem.release()
118-
}
119108
blocksUpdated.append(blockUpdated)
120109
}
121110
})
@@ -129,11 +118,6 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
129118
// Start the computation of RDD - this step will also cache the RDD
130119
val asyncCount = testRdd.countAsync()
131120

132-
// Wait for the job to have started.
133-
taskStartSem.acquire(1)
134-
// Wait for each executor + driver to have it's broadcast info delivered.
135-
broadcastSem.acquire((numExecs + 1))
136-
137121
// Make sure the job is either mid run or otherwise has data to migrate.
138122
if (migrateDuring) {
139123
// Wait for one of the tasks to succeed and finish writing its blocks.

0 commit comments

Comments
 (0)