Skip to content

Commit 7900d19

Browse files
committed
[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong Zhu <[email protected]> Closes #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent b1fcefc commit 7900d19

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
5757
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
5858
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
5959
*/
60-
def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = {
60+
def newDaemonCachedThreadPool(
61+
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
6162
val threadFactory = namedThreadFactory(prefix)
62-
new ThreadPoolExecutor(
63-
0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory)
63+
val threadPool = new ThreadPoolExecutor(
64+
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
65+
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
66+
keepAliveSeconds,
67+
TimeUnit.SECONDS,
68+
new LinkedBlockingQueue[Runnable],
69+
threadFactory)
70+
threadPool.allowCoreThreadTimeOut(true)
71+
threadPool
6472
}
6573

6674
/**

core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
2424
import scala.concurrent.{Await, Future}
2525
import scala.util.Random
2626

27+
import org.scalatest.concurrent.Eventually._
28+
2729
import org.apache.spark.SparkFunSuite
2830

2931
class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
5961
}
6062
}
6163

64+
test("newDaemonCachedThreadPool") {
65+
val maxThreadNumber = 10
66+
val startThreadsLatch = new CountDownLatch(maxThreadNumber)
67+
val latch = new CountDownLatch(1)
68+
val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
69+
"ThreadUtilsSuite-newDaemonCachedThreadPool",
70+
maxThreadNumber,
71+
keepAliveSeconds = 2)
72+
try {
73+
for (_ <- 1 to maxThreadNumber) {
74+
cachedThreadPool.execute(new Runnable {
75+
override def run(): Unit = {
76+
startThreadsLatch.countDown()
77+
latch.await(10, TimeUnit.SECONDS)
78+
}
79+
})
80+
}
81+
startThreadsLatch.await(10, TimeUnit.SECONDS)
82+
assert(cachedThreadPool.getActiveCount === maxThreadNumber)
83+
assert(cachedThreadPool.getQueue.size === 0)
84+
85+
// Submit a new task and it should be put into the queue since the thread number reaches the
86+
// limitation
87+
cachedThreadPool.execute(new Runnable {
88+
override def run(): Unit = {
89+
latch.await(10, TimeUnit.SECONDS)
90+
}
91+
})
92+
93+
assert(cachedThreadPool.getActiveCount === maxThreadNumber)
94+
assert(cachedThreadPool.getQueue.size === 1)
95+
96+
latch.countDown()
97+
eventually(timeout(10.seconds)) {
98+
// All threads should be stopped after keepAliveSeconds
99+
assert(cachedThreadPool.getActiveCount === 0)
100+
assert(cachedThreadPool.getPoolSize === 0)
101+
}
102+
} finally {
103+
cachedThreadPool.shutdownNow()
104+
}
105+
}
106+
62107
test("sameThread") {
63108
val callerThreadName = Thread.currentThread().getName()
64109
val f = Future {

0 commit comments

Comments
 (0)