2323import java .io .DataInputStream ;
2424import java .io .DataOutputStream ;
2525import java .io .IOException ;
26+ import java .io .UncheckedIOException ;
2627import java .util .Collections ;
2728import java .util .HashMap ;
2829import java .util .List ;
2930import java .util .Map ;
30- import java .util .concurrent .ExecutorService ;
31- import java .util .concurrent .Executors ;
32- import java .util .concurrent .TimeUnit ;
31+ import java .util .concurrent .atomic .AtomicInteger ;
32+ import java .util .stream .Stream ;
3333
3434import javax .security .auth .login .AppConfigurationEntry ;
3535
4040import org .apache .curator .framework .api .ACLProvider ;
4141import org .apache .curator .framework .imps .DefaultACLProvider ;
4242import org .apache .curator .framework .recipes .cache .ChildData ;
43- import org .apache .curator .framework .recipes .cache .PathChildrenCache ;
44- import org .apache .curator .framework .recipes .cache .PathChildrenCache .StartMode ;
45- import org .apache .curator .framework .recipes .cache .PathChildrenCacheEvent ;
46- import org .apache .curator .framework .recipes .cache .PathChildrenCacheListener ;
43+ import org .apache .curator .framework .recipes .cache .CuratorCache ;
44+ import org .apache .curator .framework .recipes .cache .CuratorCacheBridge ;
45+ import org .apache .curator .framework .recipes .cache .CuratorCacheListener ;
4746import org .apache .curator .framework .recipes .shared .SharedCount ;
4847import org .apache .curator .framework .recipes .shared .VersionedValue ;
4948import org .apache .curator .retry .RetryNTimes ;
@@ -113,7 +112,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
113112 // by default it is still incrementing seq number by 1 each time
114113 public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1 ;
115114
116- private static Logger LOG = LoggerFactory
115+ private static final Logger LOG = LoggerFactory
117116 .getLogger (ZKDelegationTokenSecretManager .class );
118117
119118 private static final String JAAS_LOGIN_ENTRY_NAME =
@@ -139,10 +138,8 @@ public static void setCurator(CuratorFramework curator) {
139138 protected final CuratorFramework zkClient ;
140139 private SharedCount delTokSeqCounter ;
141140 private SharedCount keyIdSeqCounter ;
142- private PathChildrenCache keyCache ;
143- private PathChildrenCache tokenCache ;
144- private ExecutorService listenerThreadPool ;
145- private final long shutdownTimeout ;
141+ private CuratorCacheBridge keyCache ;
142+ private CuratorCacheBridge tokenCache ;
146143 private final int seqNumBatchSize ;
147144 private int currentSeqNum ;
148145 private int currentMaxSeqNum ;
@@ -158,8 +155,6 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
158155 DelegationTokenManager .RENEW_INTERVAL_DEFAULT ) * 1000 ,
159156 conf .getLong (DelegationTokenManager .REMOVAL_SCAN_INTERVAL ,
160157 DelegationTokenManager .REMOVAL_SCAN_INTERVAL_DEFAULT ) * 1000 );
161- shutdownTimeout = conf .getLong (ZK_DTSM_ZK_SHUTDOWN_TIMEOUT ,
162- ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT );
163158 seqNumBatchSize = conf .getInt (ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE ,
164159 ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT );
165160 isTokenWatcherEnabled = conf .getBoolean (ZK_DTSM_TOKEN_WATCHER_ENABLED ,
@@ -333,7 +328,6 @@ public void startThreads() throws IOException {
333328 throw new IOException ("Could not create namespace" , e );
334329 }
335330 }
336- listenerThreadPool = Executors .newSingleThreadExecutor ();
337331 try {
338332 delTokSeqCounter = new SharedCount (zkClient , ZK_DTSM_SEQNUM_ROOT , 0 );
339333 if (delTokSeqCounter != null ) {
@@ -363,105 +357,122 @@ public void startThreads() throws IOException {
363357 throw new RuntimeException ("Could not create ZK paths" );
364358 }
365359 try {
366- keyCache = new PathChildrenCache (zkClient , ZK_DTSM_MASTER_KEY_ROOT , true );
360+ keyCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_MASTER_KEY_ROOT )
361+ .build ();
367362 if (keyCache != null ) {
368- keyCache .start (StartMode .BUILD_INITIAL_CACHE );
369- keyCache .getListenable ().addListener (new PathChildrenCacheListener () {
370- @ Override
371- public void childEvent (CuratorFramework client ,
372- PathChildrenCacheEvent event )
373- throws Exception {
374- switch (event .getType ()) {
375- case CHILD_ADDED :
376- processKeyAddOrUpdate (event .getData ().getData ());
377- break ;
378- case CHILD_UPDATED :
379- processKeyAddOrUpdate (event .getData ().getData ());
380- break ;
381- case CHILD_REMOVED :
382- processKeyRemoved (event .getData ().getPath ());
383- break ;
384- default :
385- break ;
386- }
387- }
388- }, listenerThreadPool );
363+ CuratorCacheListener keyCacheListener = CuratorCacheListener .builder ()
364+ .forCreates (childData -> {
365+ try {
366+ processKeyAddOrUpdate (childData .getData ());
367+ } catch (IOException e ) {
368+ LOG .error ("Error while processing Curator keyCacheListener "
369+ + "NODE_CREATED event" );
370+ throw new UncheckedIOException (e );
371+ }
372+ })
373+ .forChanges ((oldNode , node ) -> {
374+ try {
375+ processKeyAddOrUpdate (node .getData ());
376+ } catch (IOException e ) {
377+ LOG .error ("Error while processing Curator keyCacheListener "
378+ + "NODE_CHANGED event" );
379+ throw new UncheckedIOException (e );
380+ }
381+ })
382+ .forDeletes (childData -> processKeyRemoved (childData .getPath ()))
383+ .build ();
384+ keyCache .listenable ().addListener (keyCacheListener );
385+ keyCache .start ();
389386 loadFromZKCache (false );
390387 }
391388 } catch (Exception e ) {
392- throw new IOException ("Could not start PathChildrenCache for keys" , e );
389+ throw new IOException ("Could not start Curator keyCacheListener for keys" ,
390+ e );
393391 }
394392 if (isTokenWatcherEnabled ) {
395393 LOG .info ("TokenCache is enabled" );
396394 try {
397- tokenCache = new PathChildrenCache (zkClient , ZK_DTSM_TOKENS_ROOT , true );
395+ tokenCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_TOKENS_ROOT )
396+ .build ();
398397 if (tokenCache != null ) {
399- tokenCache .start (StartMode .BUILD_INITIAL_CACHE );
400- tokenCache .getListenable ().addListener (new PathChildrenCacheListener () {
401-
402- @ Override
403- public void childEvent (CuratorFramework client ,
404- PathChildrenCacheEvent event ) throws Exception {
405- switch (event .getType ()) {
406- case CHILD_ADDED :
407- processTokenAddOrUpdate (event .getData ().getData ());
408- break ;
409- case CHILD_UPDATED :
410- processTokenAddOrUpdate (event .getData ().getData ());
411- break ;
412- case CHILD_REMOVED :
413- processTokenRemoved (event .getData ());
414- break ;
415- default :
416- break ;
417- }
418- }
419- }, listenerThreadPool );
398+ CuratorCacheListener tokenCacheListener = CuratorCacheListener .builder ()
399+ .forCreates (childData -> {
400+ try {
401+ processTokenAddOrUpdate (childData .getData ());
402+ } catch (IOException e ) {
403+ LOG .error ("Error while processing Curator tokenCacheListener "
404+ + "NODE_CREATED event" );
405+ throw new UncheckedIOException (e );
406+ }
407+ })
408+ .forChanges ((oldNode , node ) -> {
409+ try {
410+ processTokenAddOrUpdate (node .getData ());
411+ } catch (IOException e ) {
412+ LOG .error ("Error while processing Curator tokenCacheListener "
413+ + "NODE_CHANGED event" );
414+ throw new UncheckedIOException (e );
415+ }
416+ })
417+ .forDeletes (childData -> {
418+ try {
419+ processTokenRemoved (childData );
420+ } catch (IOException e ) {
421+ LOG .error ("Error while processing Curator tokenCacheListener "
422+ + "NODE_DELETED event" );
423+ throw new UncheckedIOException (e );
424+ }
425+ })
426+ .build ();
427+ tokenCache .listenable ().addListener (tokenCacheListener );
428+ tokenCache .start ();
420429 loadFromZKCache (true );
421430 }
422431 } catch (Exception e ) {
423- throw new IOException ("Could not start PathChildrenCache for tokens" , e );
432+ throw new IOException (
433+ "Could not start Curator tokenCacheListener for tokens" , e );
424434 }
425435 }
426436 super .startThreads ();
427437 }
428438
429439 /**
430- * Load the PathChildrenCache into the in-memory map. Possible caches to be
440+ * Load the CuratorCache into the in-memory map. Possible caches to be
431441 * loaded are keyCache and tokenCache.
432442 *
433443 * @param isTokenCache true if loading tokenCache, false if loading keyCache.
434444 */
435445 private void loadFromZKCache (final boolean isTokenCache ) {
436446 final String cacheName = isTokenCache ? "token" : "key" ;
437447 LOG .info ("Starting to load {} cache." , cacheName );
438- final List <ChildData > children ;
448+ final Stream <ChildData > children ;
439449 if (isTokenCache ) {
440- children = tokenCache .getCurrentData ();
450+ children = tokenCache .stream ();
441451 } else {
442- children = keyCache .getCurrentData ();
452+ children = keyCache .stream ();
443453 }
444454
445- int count = 0 ;
446- for ( ChildData child : children ) {
455+ final AtomicInteger count = new AtomicInteger ( 0 ) ;
456+ children . forEach ( childData -> {
447457 try {
448458 if (isTokenCache ) {
449- processTokenAddOrUpdate (child .getData ());
459+ processTokenAddOrUpdate (childData .getData ());
450460 } else {
451- processKeyAddOrUpdate (child .getData ());
461+ processKeyAddOrUpdate (childData .getData ());
452462 }
453463 } catch (Exception e ) {
454464 LOG .info ("Ignoring node {} because it failed to load." ,
455- child .getPath ());
465+ childData .getPath ());
456466 LOG .debug ("Failure exception:" , e );
457- ++ count ;
467+ count . getAndIncrement () ;
458468 }
459- }
469+ });
460470 if (isTokenCache ) {
461471 syncTokenOwnerStats ();
462472 }
463- if (count > 0 ) {
464- LOG .warn ("Ignored {} nodes while loading {} cache." , count , cacheName );
473+ if (count .get () > 0 ) {
474+ LOG .warn ("Ignored {} nodes while loading {} cache." , count .get (),
475+ cacheName );
465476 }
466477 LOG .info ("Loaded {} cache." , cacheName );
467478 }
@@ -550,20 +561,6 @@ public void stopThreads() {
550561 } catch (Exception e ) {
551562 LOG .error ("Could not stop Curator Framework" , e );
552563 }
553- if (listenerThreadPool != null ) {
554- listenerThreadPool .shutdown ();
555- try {
556- // wait for existing tasks to terminate
557- if (!listenerThreadPool .awaitTermination (shutdownTimeout ,
558- TimeUnit .MILLISECONDS )) {
559- LOG .error ("Forcing Listener threadPool to shutdown !!" );
560- listenerThreadPool .shutdownNow ();
561- }
562- } catch (InterruptedException ie ) {
563- listenerThreadPool .shutdownNow ();
564- Thread .currentThread ().interrupt ();
565- }
566- }
567564 }
568565
569566 private void createPersistentNode (String nodePath ) throws Exception {
@@ -992,11 +989,6 @@ static String getNodePath(String root, String nodeName) {
992989 return (root + "/" + nodeName );
993990 }
994991
995- @ VisibleForTesting
996- public ExecutorService getListenerThreadPool () {
997- return listenerThreadPool ;
998- }
999-
1000992 @ VisibleForTesting
1001993 DelegationTokenInformation getTokenInfoFromMemory (TokenIdent ident ) {
1002994 return currentTokens .get (ident );
0 commit comments