|
31 | 31 | import java.util.concurrent.ConcurrentHashMap; |
32 | 32 | import java.util.concurrent.Executors; |
33 | 33 | import java.util.concurrent.RejectedExecutionException; |
34 | | -import java.util.concurrent.RejectedExecutionHandler; |
35 | 34 | import java.util.concurrent.ThreadPoolExecutor; |
36 | 35 | import java.util.concurrent.TimeUnit; |
37 | 36 | import java.util.concurrent.atomic.AtomicInteger; |
@@ -145,15 +144,18 @@ private void createCompactionExecutors() { |
145 | 144 | final String n = Thread.currentThread().getName(); |
146 | 145 |
|
147 | 146 | StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); |
| 147 | + // Since the StealJobQueue inner uses the PriorityBlockingQueue, |
| 148 | + // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for |
| 149 | + // the long and short compaction thread pool executors since HBASE-27332. |
| 150 | + // If anyone who what to change the StealJobQueue to a bounded queue, |
| 151 | + // please add the rejection handler back. |
148 | 152 | this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, |
149 | 153 | stealJobQueue, |
150 | 154 | new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); |
151 | | - this.longCompactions.setRejectedExecutionHandler(new Rejection()); |
152 | 155 | this.longCompactions.prestartAllCoreThreads(); |
153 | 156 | this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, |
154 | 157 | stealJobQueue.getStealFromQueue(), |
155 | 158 | new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); |
156 | | - this.shortCompactions.setRejectedExecutionHandler(new Rejection()); |
157 | 159 | } |
158 | 160 |
|
159 | 161 | @Override |
@@ -724,22 +726,6 @@ private String formatStackTrace(Exception ex) { |
724 | 726 | } |
725 | 727 | } |
726 | 728 |
|
727 | | - /** |
728 | | - * Cleanup class to use when rejecting a compaction request from the queue. |
729 | | - */ |
730 | | - private static class Rejection implements RejectedExecutionHandler { |
731 | | - @Override |
732 | | - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { |
733 | | - if (runnable instanceof CompactionRunner) { |
734 | | - CompactionRunner runner = (CompactionRunner) runnable; |
735 | | - LOG.debug("Compaction Rejected: " + runner); |
736 | | - if (runner.compaction != null) { |
737 | | - runner.store.cancelRequestedCompaction(runner.compaction); |
738 | | - } |
739 | | - } |
740 | | - } |
741 | | - } |
742 | | - |
743 | 729 | /** |
744 | 730 | * {@inheritDoc} |
745 | 731 | */ |
|
0 commit comments