Skip to content

Commit 0b604d9

Browse files
committed
HBASE-24750 : All ExecutorService should use guava ThreadFactoryBuilder
Closes #2196 Signed-off-by: Nick Dimiduk <[email protected]> Signed-off-by: Ted Yu <[email protected]>
1 parent 485e0d2 commit 0b604d9

File tree

39 files changed

+130
-182
lines changed

39 files changed

+130
-182
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.hadoop.conf.Configuration;
3333
import org.apache.hadoop.hbase.Abortable;
3434
import org.apache.hadoop.hbase.errorhandling.ForeignException;
35-
import org.apache.hadoop.hbase.util.Threads;
35+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3636
import org.apache.yetus.audience.InterfaceAudience;
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public LogRollBackupSubprocedurePool(String name, Configuration conf) {
6262
LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
6363
int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
6464
this.name = name;
65-
executor =
66-
new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
67-
new LinkedBlockingQueue<>(),
68-
Threads.newDaemonThreadFactory("rs(" + name + ")-backup"));
65+
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
66+
new LinkedBlockingQueue<>(),
67+
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").build());
6968
taskPool = new ExecutorCompletionService<>(executor);
7069
}
7170

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
5353
import org.apache.hadoop.hbase.security.User;
5454
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
55-
import org.apache.hadoop.hbase.util.Threads;
5655
import org.apache.hadoop.security.UserGroupInformation;
56+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
5757
import org.apache.yetus.audience.InterfaceAudience;
5858
import org.slf4j.Logger;
5959
import org.slf4j.LoggerFactory;
@@ -76,7 +76,8 @@ class AsyncConnectionImpl implements AsyncConnection {
7676

7777
@VisibleForTesting
7878
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
79-
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
79+
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").build(), 10,
80+
TimeUnit.MILLISECONDS);
8081

8182
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
8283

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.hadoop.hbase.ServerName;
3737
import org.apache.hadoop.hbase.util.Addressing;
3838
import org.apache.hadoop.hbase.util.ExceptionUtil;
39-
import org.apache.hadoop.hbase.util.Threads;
39+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4040
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
4141
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
4242
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -178,8 +178,9 @@ public boolean isDeadServer(ServerName sn) {
178178
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
179179
class MulticastListener implements Listener {
180180
private DatagramChannel channel;
181-
private final EventLoopGroup group = new NioEventLoopGroup(
182-
1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
181+
private final EventLoopGroup group = new NioEventLoopGroup(1,
182+
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
183+
.build());
183184

184185
public MulticastListener() {
185186
}

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
import org.apache.hadoop.hbase.security.UserProvider;
4242
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4343
import org.apache.hadoop.hbase.util.PoolMap;
44-
import org.apache.hadoop.hbase.util.Threads;
4544
import org.apache.hadoop.io.compress.CompressionCodec;
4645
import org.apache.hadoop.ipc.RemoteException;
46+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4747
import org.apache.yetus.audience.InterfaceAudience;
4848
import org.slf4j.Logger;
4949
import org.slf4j.LoggerFactory;
@@ -91,10 +91,12 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
9191
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
9292

9393
protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
94-
Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
94+
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").build(), 10,
95+
TimeUnit.MILLISECONDS);
9596

9697
private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
97-
.newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
98+
.newScheduledThreadPool(1,
99+
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").build());
98100

99101
protected boolean running = true; // if client runs
100102

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
3434
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
3535
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
36-
import org.apache.hadoop.hbase.util.Threads;
3736
import org.apache.hadoop.security.UserGroupInformation;
37+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3838
import org.apache.yetus.audience.InterfaceAudience;
3939
import org.slf4j.Logger;
4040
import org.slf4j.LoggerFactory;
@@ -76,8 +76,9 @@ class NettyRpcConnection extends RpcConnection {
7676

7777
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
7878

79-
private static final ScheduledExecutorService RELOGIN_EXECUTOR =
80-
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
79+
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
80+
.newSingleThreadScheduledExecutor(
81+
new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d").build());
8182

8283
private final NettyRpcClient rpcClient;
8384

hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import org.apache.hadoop.util.ReflectionUtils;
3535
import org.apache.hadoop.util.StringUtils;
36+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3637
import org.apache.yetus.audience.InterfaceAudience;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
@@ -197,83 +198,14 @@ public static ThreadPoolExecutor getBoundedCachedThreadPool(
197198
return boundedCachedThreadPool;
198199
}
199200

200-
public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout,
201-
TimeUnit unit, String prefix) {
202-
return getBoundedCachedThreadPool(maxCachedThread, timeout, unit,
203-
newDaemonThreadFactory(prefix));
204-
}
205-
206-
/**
207-
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
208-
* with a common prefix.
209-
* @param prefix The prefix of every created Thread's name
210-
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
211-
*/
212-
public static ThreadFactory getNamedThreadFactory(final String prefix) {
213-
SecurityManager s = System.getSecurityManager();
214-
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
215-
.getThreadGroup();
216-
217-
return new ThreadFactory() {
218-
final AtomicInteger threadNumber = new AtomicInteger(1);
219-
private final int poolNumber = Threads.poolNumber.getAndIncrement();
220-
final ThreadGroup group = threadGroup;
221-
222-
@Override
223-
public Thread newThread(Runnable r) {
224-
final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
225-
return new Thread(group, r, name);
226-
}
227-
};
228-
}
229-
230-
/**
231-
* Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
232-
* without setting the exception handler.
233-
*/
234-
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
235-
return newDaemonThreadFactory(prefix, null);
236-
}
237-
238-
/**
239-
* Get a named {@link ThreadFactory} that just builds daemon threads.
240-
* @param prefix name prefix for all threads created from the factory
241-
* @param handler unhandles exception handler to set for all threads
242-
* @return a thread factory that creates named, daemon threads with
243-
* the supplied exception handler and normal priority
244-
*/
245-
public static ThreadFactory newDaemonThreadFactory(final String prefix,
246-
final UncaughtExceptionHandler handler) {
247-
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
248-
return new ThreadFactory() {
249-
@Override
250-
public Thread newThread(Runnable r) {
251-
Thread t = namedFactory.newThread(r);
252-
if (handler != null) {
253-
t.setUncaughtExceptionHandler(handler);
254-
} else {
255-
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
256-
}
257-
if (!t.isDaemon()) {
258-
t.setDaemon(true);
259-
}
260-
if (t.getPriority() != Thread.NORM_PRIORITY) {
261-
t.setPriority(Thread.NORM_PRIORITY);
262-
}
263-
return t;
264-
}
265-
266-
};
267-
}
268-
269201
/** Sets an UncaughtExceptionHandler for the thread which logs the
270202
* Exception stack if the thread dies.
271203
*/
272204
public static void setLoggingUncaughtExceptionHandler(Thread t) {
273205
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
274206
}
275207

276-
private static interface PrintThreadInfoHelper {
208+
private interface PrintThreadInfoHelper {
277209

278210
void printThreadInfo(PrintStream stream, String title);
279211

hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@
3434
import org.apache.hadoop.hbase.client.Get;
3535
import org.apache.hadoop.hbase.client.Put;
3636
import org.apache.hadoop.hbase.util.Bytes;
37-
import org.apache.hadoop.hbase.util.Threads;
3837
import org.apache.hadoop.util.Tool;
3938
import org.apache.hadoop.util.ToolRunner;
39+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4040
import org.apache.yetus.audience.InterfaceAudience;
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
@@ -130,7 +130,7 @@ public int run(String[] args) throws Exception {
130130
TableName tableName = TableName.valueOf(args[0]);
131131
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
132132
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
133-
Threads.newDaemonThreadFactory("AsyncClientExample"));
133+
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d").build());
134134
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
135135
// need a thread pool and may have a better performance if you use it correctly as it can save
136136
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad

hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/policies/TwoConcurrentActionPolicy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import org.apache.hadoop.hbase.chaos.actions.Action;
2222
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
23-
import org.apache.hadoop.hbase.util.Threads;
2423
import org.apache.hadoop.util.StringUtils;
24+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
2525

2626
import java.util.concurrent.ExecutionException;
2727
import java.util.concurrent.ExecutorService;
@@ -42,7 +42,7 @@ public TwoConcurrentActionPolicy(long sleepTime, Action[] actionsOne, Action[] a
4242
this.actionsOne = actionsOne;
4343
this.actionsTwo = actionsTwo;
4444
executor = Executors.newFixedThreadPool(2,
45-
Threads.newDaemonThreadFactory("TwoConcurrentAction"));
45+
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").build());
4646
}
4747

4848
@Override

hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
3737
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3838
import org.apache.hadoop.hbase.util.Threads;
39+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3940
import org.apache.yetus.audience.InterfaceAudience;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
@@ -100,8 +101,8 @@ public boolean start() {
100101

101102
// Create the thread pool that will execute RPCs
102103
threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
103-
Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
104-
getUncaughtExceptionHandler()));
104+
new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
105+
.setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
105106
return true;
106107
}
107108

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicInteger;
2727
import org.apache.hadoop.conf.Configuration;
28-
import org.apache.hadoop.hbase.util.Threads;
28+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
2929
import org.apache.yetus.audience.InterfaceAudience;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
@@ -60,14 +60,10 @@ public void init(Context context) {
6060
public void start() {
6161
LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
6262
this.getClass().getSimpleName(), handlerCount, maxQueueLength);
63-
this.executor = new ThreadPoolExecutor(
64-
handlerCount,
65-
handlerCount,
66-
60,
67-
TimeUnit.SECONDS,
68-
new ArrayBlockingQueue<>(maxQueueLength),
69-
Threads.newDaemonThreadFactory("FifoRpcScheduler.handler"),
70-
new ThreadPoolExecutor.CallerRunsPolicy());
63+
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
64+
new ArrayBlockingQueue<>(maxQueueLength),
65+
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").build(),
66+
new ThreadPoolExecutor.CallerRunsPolicy());
7167
}
7268

7369
@Override

0 commit comments

Comments
 (0)