1818package org .apache .hadoop .hbase .io .hfile ;
1919
2020import static org .junit .Assert .assertEquals ;
21+ import static org .junit .Assert .assertFalse ;
22+ import static org .junit .Assert .assertTrue ;
23+ import static org .mockito .ArgumentMatchers .any ;
24+ import static org .mockito .ArgumentMatchers .anyInt ;
25+ import static org .mockito .ArgumentMatchers .anyString ;
26+ import static org .mockito .Mockito .mock ;
27+ import static org .mockito .Mockito .when ;
2128
22- import com .thimbleware .jmemcached .CacheElement ;
23- import com .thimbleware .jmemcached .CacheImpl ;
24- import com .thimbleware .jmemcached .Key ;
25- import com .thimbleware .jmemcached .LocalCacheElement ;
26- import com .thimbleware .jmemcached .MemCacheDaemon ;
27- import com .thimbleware .jmemcached .storage .hash .ConcurrentLinkedHashMap ;
28- import com .thimbleware .jmemcached .storage .hash .ConcurrentLinkedHashMap .EvictionPolicy ;
29+ import java .io .IOException ;
2930import java .net .InetSocketAddress ;
31+ import java .util .List ;
32+ import java .util .concurrent .ConcurrentHashMap ;
33+ import java .util .concurrent .ConcurrentMap ;
34+ import java .util .concurrent .CountDownLatch ;
35+ import java .util .concurrent .ForkJoinPool ;
36+ import java .util .concurrent .ThreadLocalRandom ;
37+ import net .spy .memcached .CachedData ;
38+ import net .spy .memcached .ConnectionFactory ;
39+ import net .spy .memcached .FailureMode ;
40+ import net .spy .memcached .MemcachedClient ;
41+ import net .spy .memcached .internal .OperationFuture ;
42+ import net .spy .memcached .ops .Operation ;
43+ import net .spy .memcached .ops .OperationState ;
44+ import net .spy .memcached .ops .OperationStatus ;
45+ import net .spy .memcached .transcoders .Transcoder ;
3046import org .apache .hadoop .conf .Configuration ;
3147import org .apache .hadoop .hbase .HBaseClassTestRule ;
32- import org .apache .hadoop .hbase .HBaseTestingUtil ;
3348import org .apache .hadoop .hbase .HConstants ;
3449import org .apache .hadoop .hbase .Waiter ;
3550import org .apache .hadoop .hbase .io .hfile .CacheTestUtils .HFileBlockPair ;
3651import org .apache .hadoop .hbase .testclassification .IOTests ;
3752import org .apache .hadoop .hbase .testclassification .SmallTests ;
38- import org .junit .AfterClass ;
3953import org .junit .Before ;
40- import org .junit .BeforeClass ;
4154import org .junit .ClassRule ;
4255import org .junit .Test ;
4356import org .junit .experimental .categories .Category ;
@@ -49,76 +62,94 @@ public class TestMemcachedBlockCache {
4962 public static final HBaseClassTestRule CLASS_RULE =
5063 HBaseClassTestRule .forClass (TestMemcachedBlockCache .class );
5164
52- static MemCacheDaemon <? extends CacheElement > MEMCACHED ;
53- static MemcachedBlockCache CACHE ;
65+ private MemcachedBlockCache cache ;
5466
55- @ Before
56- public void before () throws Exception {
57- MEMCACHED .getCache ().flush_all ();
58- assertEquals ("Memcache is not empty" , MEMCACHED .getCache ().getCurrentItems (), 0 );
59- }
67+ private ConcurrentMap <String , CachedData > backingMap ;
6068
61- @ BeforeClass
62- public static void setup () throws Exception {
63- int port = HBaseTestingUtil .randomFreePort ();
64- MEMCACHED = createDaemon (port );
69+ @ Before
70+ public void setup () throws Exception {
71+ int port = ThreadLocalRandom .current ().nextInt (1024 , 65536 );
6572 Configuration conf = new Configuration ();
6673 conf .set ("hbase.cache.memcached.servers" , "localhost:" + port );
67- CACHE = new MemcachedBlockCache ( conf );
68- }
74+ backingMap = new ConcurrentHashMap <>( );
75+ cache = new MemcachedBlockCache ( conf ) {
6976
70- @ AfterClass
71- public static void tearDown () throws Exception {
72- if (MEMCACHED != null ) {
73- MEMCACHED .stop ();
74- }
77+ private <T > OperationFuture <T > createFuture (String key , long opTimeout , T result ) {
78+ OperationFuture <T > future =
79+ new OperationFuture <>(key , new CountDownLatch (0 ), opTimeout , ForkJoinPool .commonPool ());
80+ Operation op = mock (Operation .class );
81+ when (op .getState ()).thenReturn (OperationState .COMPLETE );
82+ future .setOperation (op );
83+ future .set (result , new OperationStatus (true , "" ));
84+
85+ return future ;
86+ }
87+
88+ @ Override
89+ protected MemcachedClient createMemcachedClient (ConnectionFactory factory ,
90+ List <InetSocketAddress > serverAddresses ) throws IOException {
91+ assertEquals (FailureMode .Redistribute , factory .getFailureMode ());
92+ assertTrue (factory .isDaemon ());
93+ assertFalse (factory .useNagleAlgorithm ());
94+ assertEquals (MAX_SIZE , factory .getReadBufSize ());
95+ assertEquals (1 , serverAddresses .size ());
96+ assertEquals ("localhost" , serverAddresses .get (0 ).getHostName ());
97+ assertEquals (port , serverAddresses .get (0 ).getPort ());
98+ MemcachedClient client = mock (MemcachedClient .class );
99+ when (client .set (anyString (), anyInt (), any (), any ())).then (inv -> {
100+ String key = inv .getArgument (0 );
101+ HFileBlock block = inv .getArgument (2 );
102+ Transcoder <HFileBlock > tc = inv .getArgument (3 );
103+ CachedData cd = tc .encode (block );
104+ backingMap .put (key , cd );
105+ return createFuture (key , factory .getOperationTimeout (), true );
106+ });
107+ when (client .delete (anyString ())).then (inv -> {
108+ String key = inv .getArgument (0 );
109+ backingMap .remove (key );
110+ return createFuture (key , factory .getOperationTimeout (), true );
111+ });
112+ when (client .get (anyString (), any ())).then (inv -> {
113+ String key = inv .getArgument (0 );
114+ Transcoder <HFileBlock > tc = inv .getArgument (1 );
115+ CachedData cd = backingMap .get (key );
116+ return tc .decode (cd );
117+ });
118+ return client ;
119+ }
120+ };
75121 }
76122
77123 @ Test
78124 public void testCache () throws Exception {
79- final int NUM_BLOCKS = 10 ;
125+ final int numBlocks = 10 ;
80126 HFileBlockPair [] blocks =
81- CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , NUM_BLOCKS );
82- for (int i = 0 ; i < NUM_BLOCKS ; i ++) {
83- CACHE .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
127+ CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , numBlocks );
128+ for (int i = 0 ; i < numBlocks ; i ++) {
129+ cache .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
130+ }
131+ Waiter .waitFor (new Configuration (), 10000 , () -> backingMap .size () == numBlocks );
132+ for (int i = 0 ; i < numBlocks ; i ++) {
133+ HFileBlock actual = (HFileBlock ) cache .getBlock (blocks [i ].getBlockName (), false , false , true );
134+ HFileBlock expected = blocks [i ].getBlock ();
135+ assertEquals (expected .getBlockType (), actual .getBlockType ());
136+ assertEquals (expected .getSerializedLength (), actual .getSerializedLength ());
84137 }
85- Waiter .waitFor (new Configuration (), 10000 ,
86- () -> MEMCACHED .getCache ().getCurrentItems () == NUM_BLOCKS );
87138 }
88139
89140 @ Test
90141 public void testEviction () throws Exception {
91- final int NUM_BLOCKS = 10 ;
142+ final int numBlocks = 10 ;
92143 HFileBlockPair [] blocks =
93- CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , NUM_BLOCKS );
94- for (int i = 0 ; i < NUM_BLOCKS ; i ++) {
95- CACHE .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
96- }
97- Waiter .waitFor (new Configuration (), 10000 ,
98- () -> MEMCACHED .getCache ().getCurrentItems () == NUM_BLOCKS );
99- for (int i = 0 ; i < NUM_BLOCKS ; i ++) {
100- CACHE .evictBlock (blocks [i ].getBlockName ());
144+ CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , numBlocks );
145+ for (int i = 0 ; i < numBlocks ; i ++) {
146+ cache .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
101147 }
102- Waiter .waitFor (new Configuration (), 10000 , () -> MEMCACHED .getCache ().getCurrentItems () == 0 );
103- }
104-
105- private static MemCacheDaemon <? extends CacheElement > createDaemon (int port ) {
106- InetSocketAddress addr = new InetSocketAddress ("localhost" , port );
107- MemCacheDaemon <LocalCacheElement > daemon = new MemCacheDaemon <LocalCacheElement >();
108- ConcurrentLinkedHashMap <Key , LocalCacheElement > cacheStorage =
109- ConcurrentLinkedHashMap .create (EvictionPolicy .LRU , 1000 , 1024 * 1024 );
110- daemon .setCache (new CacheImpl (cacheStorage ));
111- daemon .setAddr (addr );
112- daemon .setVerbose (true );
113- daemon .start ();
114- while (!daemon .isRunning ()) {
115- try {
116- Thread .sleep (100 );
117- } catch (InterruptedException e ) {
118- Thread .currentThread ().interrupt ();
119- }
148+ Waiter .waitFor (new Configuration (), 10000 , () -> backingMap .size () == numBlocks );
149+ for (int i = 0 ; i < numBlocks ; i ++) {
150+ cache .evictBlock (blocks [i ].getBlockName ());
120151 }
121- return daemon ;
152+ Waiter . waitFor ( new Configuration (), 10000 , () -> backingMap . size () == 0 ) ;
122153 }
123154
124155}
0 commit comments