diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index c4b135ff256a..cac141cc1393 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.thrift2.generated.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConnectionCache; +import org.apache.thrift.TBaseHelper; import org.apache.thrift.TException; import com.google.common.cache.Cache; @@ -116,10 +119,14 @@ private static long now() { return System.nanoTime(); } + ThriftHBaseServiceHandler(Configuration conf, UserProvider userProvider) throws IOException + { + this(conf, userProvider, conf.getInt(MAX_IDLETIME, 10 * 60 * 1000)); + } + ThriftHBaseServiceHandler(final Configuration conf, - final UserProvider userProvider) throws IOException { + final UserProvider userProvider, int maxIdleTime) throws IOException { int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); - int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); connectionCache = new ConnectionCache( conf, userProvider, cleanInterval, maxIdleTime); tableFactory = new HTableFactory() { @@ -133,8 +140,20 @@ public HTableInterface createHTableInterface(Configuration config, } } }; + RemovalListener removalListener = new RemovalListener() { + @Override + public void onRemoval(RemovalNotification removal){ + HTablePool pool = (HTablePool)removal.getValue(); + try { + pool.close(); + } catch (IOException ex) { + ThriftHBaseServiceHandler.LOG.error("Failed to close HTablePool for user=" + (String)removal.getKey(), ex); + } + } + }; + htablePools = CacheBuilder.newBuilder().expireAfterAccess( - maxIdleTime, TimeUnit.MILLISECONDS).softValues().concurrencyLevel(4).build(); + maxIdleTime, TimeUnit.MILLISECONDS).removalListener(removalListener).softValues().concurrencyLevel(4).build(); maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000); htablePoolCreater = new Callable() { public HTablePool call() { @@ -143,7 +162,16 @@ public HTablePool call() { }; } + public void refreshConnection(String tableName) throws IOException { + this.connectionCache.getTable(tableName); + } + private HTableInterface getTable(ByteBuffer tableName) { + try { + refreshConnection(Bytes.toString(TBaseHelper.byteBufferToByteArray(tableName))); + } catch (IOException ex) { + LOG.warn("Failed to refresh connection cache.", ex); + } String currentUser = connectionCache.getEffectiveUser(); try { HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);