|
31 | 31 | import java.util.concurrent.ConcurrentHashMap; |
32 | 32 | import java.util.concurrent.Executors; |
33 | 33 | import java.util.concurrent.RejectedExecutionException; |
34 | | -import java.util.concurrent.RejectedExecutionHandler; |
35 | 34 | import java.util.concurrent.ThreadPoolExecutor; |
36 | 35 | import java.util.concurrent.TimeUnit; |
37 | 36 | import java.util.concurrent.atomic.AtomicInteger; |
@@ -145,15 +144,15 @@ private void createCompactionExecutors() { |
145 | 144 | final String n = Thread.currentThread().getName(); |
146 | 145 |
|
147 | 146 | StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); |
| 147 | + // Since the StealJobQueue is unbounded, we need not to set the RejectedExecutionHandler for |
| 148 | + // the long and short compaction thread pool executors. |
148 | 149 | this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, |
149 | 150 | stealJobQueue, |
150 | 151 | new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); |
151 | | - this.longCompactions.setRejectedExecutionHandler(new Rejection()); |
152 | 152 | this.longCompactions.prestartAllCoreThreads(); |
153 | 153 | this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, |
154 | 154 | stealJobQueue.getStealFromQueue(), |
155 | 155 | new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); |
156 | | - this.shortCompactions.setRejectedExecutionHandler(new Rejection()); |
157 | 156 | } |
158 | 157 |
|
159 | 158 | @Override |
@@ -382,15 +381,20 @@ protected void requestCompactionInternal(HRegion region, HStore store, String wh |
382 | 381 | // pool; we will do selection there, and move to large pool if necessary. |
383 | 382 | pool = shortCompactions; |
384 | 383 | } |
385 | | - pool.execute( |
386 | | - new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); |
| 384 | + |
| 385 | + // A simple implementation for under compaction marks. |
| 386 | + // Since this method is always called in the synchronized methods, we do not need to use the |
| 387 | + // boolean result to make sure that exactly the one that added here will be removed |
| 388 | + // in the next steps. |
| 389 | + underCompactionStores.add(getStoreNameForUnderCompaction(store)); |
387 | 390 | if (LOG.isDebugEnabled()) { |
388 | 391 | LOG.debug( |
389 | 392 | "Add compact mark for store {}, priority={}, current under compaction " |
390 | 393 | + "store size is {}", |
391 | 394 | getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); |
392 | 395 | } |
393 | | - underCompactionStores.add(getStoreNameForUnderCompaction(store)); |
| 396 | + pool.submit( |
| 397 | + new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); |
394 | 398 | region.incrementCompactionsQueuedCount(); |
395 | 399 | if (LOG.isDebugEnabled()) { |
396 | 400 | String type = (pool == shortCompactions) ? "Small " : "Large "; |
@@ -719,22 +723,6 @@ private String formatStackTrace(Exception ex) { |
719 | 723 | } |
720 | 724 | } |
721 | 725 |
|
722 | | - /** |
723 | | - * Cleanup class to use when rejecting a compaction request from the queue. |
724 | | - */ |
725 | | - private static class Rejection implements RejectedExecutionHandler { |
726 | | - @Override |
727 | | - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { |
728 | | - if (runnable instanceof CompactionRunner) { |
729 | | - CompactionRunner runner = (CompactionRunner) runnable; |
730 | | - LOG.debug("Compaction Rejected: " + runner); |
731 | | - if (runner.compaction != null) { |
732 | | - runner.store.cancelRequestedCompaction(runner.compaction); |
733 | | - } |
734 | | - } |
735 | | - } |
736 | | - } |
737 | | - |
738 | 726 | /** |
739 | 727 | * {@inheritDoc} |
740 | 728 | */ |
|
0 commit comments