diff --git a/hbase-external-blockcache/pom.xml b/hbase-external-blockcache/pom.xml index f04c7e2bed47..3b862aec4436 100644 --- a/hbase-external-blockcache/pom.xml +++ b/hbase-external-blockcache/pom.xml @@ -91,12 +91,6 @@ test-jar test - - org.apache.hbase - hbase-zookeeper - test-jar - test - org.apache.hbase hbase-server @@ -104,9 +98,8 @@ test - com.thimbleware.jmemcached - jmemcached-core - 1.0.0 + org.mockito + mockito-core test diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 2e8811e93a78..efb8b97bccbd 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import net.spy.memcached.CachedData; +import net.spy.memcached.ConnectionFactory; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.FailureMode; import net.spy.memcached.MemcachedClient; @@ -122,11 +123,16 @@ public MemcachedBlockCache(Configuration c) throws IOException { serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); } - client = new MemcachedClient(builder.build(), serverAddresses); + client = createMemcachedClient(builder.build(), serverAddresses); this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, STAT_THREAD_PERIOD, TimeUnit.SECONDS); } + protected MemcachedClient createMemcachedClient(ConnectionFactory factory, + List serverAddresses) throws IOException { + return new MemcachedClient(factory, serverAddresses); + } + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { cacheBlock(cacheKey, buf); diff --git a/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java b/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java index f1b6af7d3cdf..96ff85e8414c 100644 --- a/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java +++ b/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java @@ -18,26 +18,39 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import com.thimbleware.jmemcached.CacheElement; -import com.thimbleware.jmemcached.CacheImpl; -import com.thimbleware.jmemcached.Key; -import com.thimbleware.jmemcached.LocalCacheElement; -import com.thimbleware.jmemcached.MemCacheDaemon; -import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap; -import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap.EvictionPolicy; +import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; +import net.spy.memcached.CachedData; +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.FailureMode; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.internal.OperationFuture; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.transcoders.Transcoder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,76 +62,94 @@ public class TestMemcachedBlockCache { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMemcachedBlockCache.class); - static MemCacheDaemon MEMCACHED; - static MemcachedBlockCache CACHE; + private MemcachedBlockCache cache; - @Before - public void before() throws Exception { - MEMCACHED.getCache().flush_all(); - assertEquals("Memcache is not empty", MEMCACHED.getCache().getCurrentItems(), 0); - } + private ConcurrentMap backingMap; - @BeforeClass - public static void setup() throws Exception { - int port = HBaseTestingUtility.randomFreePort(); - MEMCACHED = createDaemon(port); + @Before + public void setup() throws Exception { + int port = ThreadLocalRandom.current().nextInt(1024, 65536); Configuration conf = new Configuration(); conf.set("hbase.cache.memcached.servers", "localhost:" + port); - CACHE = new MemcachedBlockCache(conf); - } + backingMap = new ConcurrentHashMap<>(); + cache = new MemcachedBlockCache(conf) { - @AfterClass - public static void tearDown() throws Exception { - if (MEMCACHED != null) { - MEMCACHED.stop(); - } + private OperationFuture createFuture(String key, long opTimeout, T result) { + OperationFuture future = + new OperationFuture<>(key, new CountDownLatch(0), opTimeout, ForkJoinPool.commonPool()); + Operation op = mock(Operation.class); + when(op.getState()).thenReturn(OperationState.COMPLETE); + future.setOperation(op); + future.set(result, new OperationStatus(true, "")); + + return future; + } + + @Override + protected MemcachedClient createMemcachedClient(ConnectionFactory factory, + List serverAddresses) throws IOException { + assertEquals(FailureMode.Redistribute, factory.getFailureMode()); + assertTrue(factory.isDaemon()); + assertFalse(factory.useNagleAlgorithm()); + assertEquals(MAX_SIZE, factory.getReadBufSize()); + assertEquals(1, serverAddresses.size()); + assertEquals("localhost", serverAddresses.get(0).getHostName()); + assertEquals(port, serverAddresses.get(0).getPort()); + MemcachedClient client = mock(MemcachedClient.class); + when(client.set(anyString(), anyInt(), any(), any())).then(inv -> { + String key = inv.getArgument(0); + HFileBlock block = inv.getArgument(2); + Transcoder tc = inv.getArgument(3); + CachedData cd = tc.encode(block); + backingMap.put(key, cd); + return createFuture(key, factory.getOperationTimeout(), true); + }); + when(client.delete(anyString())).then(inv -> { + String key = inv.getArgument(0); + backingMap.remove(key); + return createFuture(key, factory.getOperationTimeout(), true); + }); + when(client.get(anyString(), any())).then(inv -> { + String key = inv.getArgument(0); + Transcoder tc = inv.getArgument(1); + CachedData cd = backingMap.get(key); + return tc.decode(cd); + }); + return client; + } + }; } @Test public void testCache() throws Exception { - final int NUM_BLOCKS = 10; + final int numBlocks = 10; HFileBlockPair[] blocks = - CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS); - for (int i = 0; i < NUM_BLOCKS; i++) { - CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); + CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks); + for (int i = 0; i < numBlocks; i++) { + cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); + } + Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks); + for (int i = 0; i < numBlocks; i++) { + HFileBlock actual = (HFileBlock) cache.getBlock(blocks[i].getBlockName(), false, false, true); + HFileBlock expected = blocks[i].getBlock(); + assertEquals(expected.getBlockType(), actual.getBlockType()); + assertEquals(expected.getSerializedLength(), actual.getSerializedLength()); } - Waiter.waitFor(new Configuration(), 10000, - () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS); } @Test public void testEviction() throws Exception { - final int NUM_BLOCKS = 10; + final int numBlocks = 10; HFileBlockPair[] blocks = - CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS); - for (int i = 0; i < NUM_BLOCKS; i++) { - CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); - } - Waiter.waitFor(new Configuration(), 10000, - () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS); - for (int i = 0; i < NUM_BLOCKS; i++) { - CACHE.evictBlock(blocks[i].getBlockName()); + CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks); + for (int i = 0; i < numBlocks; i++) { + cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); } - Waiter.waitFor(new Configuration(), 10000, () -> MEMCACHED.getCache().getCurrentItems() == 0); - } - - private static MemCacheDaemon createDaemon(int port) { - InetSocketAddress addr = new InetSocketAddress("localhost", port); - MemCacheDaemon daemon = new MemCacheDaemon(); - ConcurrentLinkedHashMap cacheStorage = - ConcurrentLinkedHashMap.create(EvictionPolicy.LRU, 1000, 1024 * 1024); - daemon.setCache(new CacheImpl(cacheStorage)); - daemon.setAddr(addr); - daemon.setVerbose(true); - daemon.start(); - while (!daemon.isRunning()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks); + for (int i = 0; i < numBlocks; i++) { + cache.evictBlock(blocks[i].getBlockName()); } - return daemon; + Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == 0); } } diff --git a/pom.xml b/pom.xml index 65c74b7c4a57..e363abb6399f 100644 --- a/pom.xml +++ b/pom.xml @@ -624,7 +624,7 @@ 2.2.1 1.0.58 - 2.12.2 + 2.12.3 1.78 1.5.1 1.0.1