2323import java .io .DataInputStream ;
2424import java .io .DataOutputStream ;
2525import java .io .IOException ;
26+ import java .io .UncheckedIOException ;
2627import java .util .Collections ;
2728import java .util .List ;
2829import java .util .Map ;
29- import java .util .concurrent .ExecutorService ;
30- import java .util .concurrent .Executors ;
31- import java .util .concurrent .TimeUnit ;
30+ import java .util .concurrent .atomic .AtomicInteger ;
31+ import java .util .stream .Stream ;
3232
3333import org .apache .curator .ensemble .fixed .FixedEnsembleProvider ;
3434import org .apache .curator .framework .CuratorFramework ;
3737import org .apache .curator .framework .api .ACLProvider ;
3838import org .apache .curator .framework .imps .DefaultACLProvider ;
3939import org .apache .curator .framework .recipes .cache .ChildData ;
40- import org .apache .curator .framework .recipes .cache .PathChildrenCache ;
41- import org .apache .curator .framework .recipes .cache .PathChildrenCache .StartMode ;
42- import org .apache .curator .framework .recipes .cache .PathChildrenCacheEvent ;
43- import org .apache .curator .framework .recipes .cache .PathChildrenCacheListener ;
40+ import org .apache .curator .framework .recipes .cache .CuratorCache ;
41+ import org .apache .curator .framework .recipes .cache .CuratorCacheBridge ;
42+ import org .apache .curator .framework .recipes .cache .CuratorCacheListener ;
4443import org .apache .curator .framework .recipes .shared .SharedCount ;
4544import org .apache .curator .framework .recipes .shared .VersionedValue ;
4645import org .apache .curator .retry .RetryNTimes ;
@@ -110,7 +109,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
110109 // by default it is still incrementing seq number by 1 each time
111110 public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1 ;
112111
113- private static Logger LOG = LoggerFactory
112+ private static final Logger LOG = LoggerFactory
114113 .getLogger (ZKDelegationTokenSecretManager .class );
115114
116115 private static final String JAAS_LOGIN_ENTRY_NAME =
@@ -136,10 +135,8 @@ public static void setCurator(CuratorFramework curator) {
136135 private final CuratorFramework zkClient ;
137136 private SharedCount delTokSeqCounter ;
138137 private SharedCount keyIdSeqCounter ;
139- private PathChildrenCache keyCache ;
140- private PathChildrenCache tokenCache ;
141- private ExecutorService listenerThreadPool ;
142- private final long shutdownTimeout ;
138+ private CuratorCacheBridge keyCache ;
139+ private CuratorCacheBridge tokenCache ;
143140 private final int seqNumBatchSize ;
144141 private int currentSeqNum ;
145142 private int currentMaxSeqNum ;
@@ -153,8 +150,6 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
153150 DelegationTokenManager .RENEW_INTERVAL_DEFAULT ) * 1000 ,
154151 conf .getLong (DelegationTokenManager .REMOVAL_SCAN_INTERVAL ,
155152 DelegationTokenManager .REMOVAL_SCAN_INTERVAL_DEFAULT ) * 1000 );
156- shutdownTimeout = conf .getLong (ZK_DTSM_ZK_SHUTDOWN_TIMEOUT ,
157- ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT );
158153 seqNumBatchSize = conf .getInt (ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE ,
159154 ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT );
160155 if (CURATOR_TL .get () != null ) {
@@ -266,7 +261,6 @@ public void startThreads() throws IOException {
266261 throw new IOException ("Could not create namespace" , e );
267262 }
268263 }
269- listenerThreadPool = Executors .newSingleThreadExecutor ();
270264 try {
271265 delTokSeqCounter = new SharedCount (zkClient , ZK_DTSM_SEQNUM_ROOT , 0 );
272266 if (delTokSeqCounter != null ) {
@@ -296,98 +290,92 @@ public void startThreads() throws IOException {
296290 throw new RuntimeException ("Could not create ZK paths" );
297291 }
298292 try {
299- keyCache = new PathChildrenCache (zkClient , ZK_DTSM_MASTER_KEY_ROOT , true );
300- if (keyCache != null ) {
301- keyCache .start (StartMode .BUILD_INITIAL_CACHE );
302- keyCache .getListenable ().addListener (new PathChildrenCacheListener () {
303- @ Override
304- public void childEvent (CuratorFramework client ,
305- PathChildrenCacheEvent event )
306- throws Exception {
307- switch (event .getType ()) {
308- case CHILD_ADDED :
309- processKeyAddOrUpdate (event .getData ().getData ());
310- break ;
311- case CHILD_UPDATED :
312- processKeyAddOrUpdate (event .getData ().getData ());
313- break ;
314- case CHILD_REMOVED :
315- processKeyRemoved (event .getData ().getPath ());
316- break ;
317- default :
318- break ;
293+ keyCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_MASTER_KEY_ROOT )
294+ .build ();
295+ CuratorCacheListener keyCacheListener = CuratorCacheListener .builder ()
296+ .forCreatesAndChanges ((oldNode , node ) -> {
297+ try {
298+ processKeyAddOrUpdate (node .getData ());
299+ } catch (IOException e ) {
300+ LOG .error ("Error while processing Curator keyCacheListener "
301+ + "NODE_CREATED / NODE_CHANGED event" );
302+ throw new UncheckedIOException (e );
319303 }
320- }
321- }, listenerThreadPool );
322- loadFromZKCache (false );
323- }
304+ })
305+ .forDeletes (childData -> processKeyRemoved (childData .getPath ()))
306+ .build ();
307+ keyCache .listenable ().addListener (keyCacheListener );
308+ keyCache .start ();
309+ loadFromZKCache (false );
324310 } catch (Exception e ) {
325- throw new IOException ("Could not start PathChildrenCache for keys" , e );
311+ throw new IOException ("Could not start Curator keyCacheListener for keys" ,
312+ e );
326313 }
327314 try {
328- tokenCache = new PathChildrenCache (zkClient , ZK_DTSM_TOKENS_ROOT , true );
329- if (tokenCache != null ) {
330- tokenCache .start (StartMode .BUILD_INITIAL_CACHE );
331- tokenCache .getListenable ().addListener (new PathChildrenCacheListener () {
332-
333- @ Override
334- public void childEvent (CuratorFramework client ,
335- PathChildrenCacheEvent event ) throws Exception {
336- switch (event .getType ()) {
337- case CHILD_ADDED :
338- processTokenAddOrUpdate (event .getData ());
339- break ;
340- case CHILD_UPDATED :
341- processTokenAddOrUpdate (event .getData ());
342- break ;
343- case CHILD_REMOVED :
344- processTokenRemoved (event .getData ());
345- break ;
346- default :
347- break ;
315+ tokenCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_TOKENS_ROOT )
316+ .build ();
317+ CuratorCacheListener tokenCacheListener = CuratorCacheListener .builder ()
318+ .forCreatesAndChanges ((oldNode , node ) -> {
319+ try {
320+ processTokenAddOrUpdate (node .getData ());
321+ } catch (IOException e ) {
322+ LOG .error ("Error while processing Curator tokenCacheListener "
323+ + "NODE_CREATED / NODE_CHANGED event" );
324+ throw new UncheckedIOException (e );
348325 }
349- }
350- }, listenerThreadPool );
351- loadFromZKCache (true );
352- }
326+ })
327+ .forDeletes (childData -> {
328+ try {
329+ processTokenRemoved (childData );
330+ } catch (IOException e ) {
331+ LOG .error ("Error while processing Curator tokenCacheListener "
332+ + "NODE_DELETED event" );
333+ throw new UncheckedIOException (e );
334+ }
335+ })
336+ .build ();
337+ tokenCache .listenable ().addListener (tokenCacheListener );
338+ tokenCache .start ();
339+ loadFromZKCache (true );
353340 } catch (Exception e ) {
354- throw new IOException ("Could not start PathChildrenCache for tokens" , e );
341+ throw new IOException (
342+ "Could not start Curator tokenCacheListener for tokens" , e );
355343 }
356344 super .startThreads ();
357345 }
358346
359347 /**
360- * Load the PathChildrenCache into the in-memory map. Possible caches to be
348+ * Load the CuratorCache into the in-memory map. Possible caches to be
361349 * loaded are keyCache and tokenCache.
362350 *
363351 * @param isTokenCache true if loading tokenCache, false if loading keyCache.
364352 */
365353 private void loadFromZKCache (final boolean isTokenCache ) {
366354 final String cacheName = isTokenCache ? "token" : "key" ;
367355 LOG .info ("Starting to load {} cache." , cacheName );
368- final List <ChildData > children ;
356+ final Stream <ChildData > children ;
369357 if (isTokenCache ) {
370- children = tokenCache .getCurrentData ();
358+ children = tokenCache .stream ();
371359 } else {
372- children = keyCache .getCurrentData ();
360+ children = keyCache .stream ();
373361 }
374362
375- int count = 0 ;
376- for ( ChildData child : children ) {
363+ final AtomicInteger count = new AtomicInteger ( 0 ) ;
364+ children . forEach ( childData -> {
377365 try {
378366 if (isTokenCache ) {
379- processTokenAddOrUpdate (child );
367+ processTokenAddOrUpdate (childData . getData () );
380368 } else {
381- processKeyAddOrUpdate (child .getData ());
369+ processKeyAddOrUpdate (childData .getData ());
382370 }
383371 } catch (Exception e ) {
384372 LOG .info ("Ignoring node {} because it failed to load." ,
385- child .getPath ());
373+ childData .getPath ());
386374 LOG .debug ("Failure exception:" , e );
387- ++ count ;
375+ count . getAndIncrement () ;
388376 }
389- }
390- if (count > 0 ) {
377+ });
378+ if (count . get () > 0 ) {
391379 LOG .warn ("Ignored {} nodes while loading {} cache." , count , cacheName );
392380 }
393381 LOG .info ("Loaded {} cache." , cacheName );
@@ -417,8 +405,8 @@ private void processKeyRemoved(String path) {
417405 }
418406 }
419407
420- private void processTokenAddOrUpdate (ChildData data ) throws IOException {
421- ByteArrayInputStream bin = new ByteArrayInputStream (data . getData () );
408+ private void processTokenAddOrUpdate (byte [] data ) throws IOException {
409+ ByteArrayInputStream bin = new ByteArrayInputStream (data );
422410 DataInputStream din = new DataInputStream (bin );
423411 TokenIdent ident = createIdentifier ();
424412 ident .readFields (din );
@@ -487,20 +475,6 @@ public void stopThreads() {
487475 } catch (Exception e ) {
488476 LOG .error ("Could not stop Curator Framework" , e );
489477 }
490- if (listenerThreadPool != null ) {
491- listenerThreadPool .shutdown ();
492- try {
493- // wait for existing tasks to terminate
494- if (!listenerThreadPool .awaitTermination (shutdownTimeout ,
495- TimeUnit .MILLISECONDS )) {
496- LOG .error ("Forcing Listener threadPool to shutdown !!" );
497- listenerThreadPool .shutdownNow ();
498- }
499- } catch (InterruptedException ie ) {
500- listenerThreadPool .shutdownNow ();
501- Thread .currentThread ().interrupt ();
502- }
503- }
504478 }
505479
506480 private void createPersistentNode (String nodePath ) throws Exception {
@@ -905,11 +879,6 @@ static String getNodePath(String root, String nodeName) {
905879 return (root + "/" + nodeName );
906880 }
907881
908- @ VisibleForTesting
909- public ExecutorService getListenerThreadPool () {
910- return listenerThreadPool ;
911- }
912-
913882 @ VisibleForTesting
914883 DelegationTokenInformation getTokenInfoFromMemory (TokenIdent ident ) {
915884 return currentTokens .get (ident );
0 commit comments