4242import  org .apache .zookeeper .ZooDefs ;
4343import  org .apache .zookeeper .ZooKeeper ;
4444import  org .apache .zookeeper .ZooKeeper .States ;
45+ import  org .apache .zookeeper .client .ZKClientConfig ;
4546import  org .apache .zookeeper .data .ACL ;
4647import  org .apache .zookeeper .data .Stat ;
4748import  org .apache .zookeeper .proto .CreateRequest ;
5051import  org .slf4j .LoggerFactory ;
5152
5253/** 
53-  * A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need 
54-  * to realize that there are two classes of requests: idempotent and non-idempotent requests. Read 
55-  * requests and unconditional sets and deletes are examples of idempotent requests, they can be 
56-  * reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its 
57-  * effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling, 
58-  * application and library writers need to keep in mind that they may need to encode information in 
59-  * the data or name of znodes to detect retries. A simple example is a create that uses a sequence 
60-  * flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception, 
61-  * that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the 
62-  * process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 
63-  * was the result of the previous create, so the process actually owns both x-109 and x-111. An easy 
64-  * way around this is to use "x-process id-" when doing the create. If the process is using an id of 
65-  * 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 
54+  * A zookeeper that can handle 'recoverable' errors. 
55+  * <p> 
56+  * To handle recoverable errors, developers need to realize that there are two classes of requests: 
57+  * idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are 
58+  * examples of idempotent requests, they can be reissued with the same results. 
59+  * <p> 
60+  * (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is 
61+  * the same.) Non-idempotent requests need special handling, application and library writers need to 
62+  * keep in mind that they may need to encode information in the data or name of znodes to detect 
63+  * retries. A simple example is a create that uses a sequence flag. If a process issues a 
64+  * create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue 
65+  * another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 
66+  * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result 
67+  * of the previous create, so the process actually owns both x-109 and x-111. An easy way around 
68+  * this is to use "x-process id-" when doing the create. If the process is using an id of 352, 
69+  * before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 
6670 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it 
6771 * created is "x-352-109". 
6872 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" 
@@ -80,37 +84,31 @@ public class RecoverableZooKeeper {
8084  private  final  int  sessionTimeout ;
8185  private  final  String  quorumServers ;
8286  private  final  int  maxMultiSize ;
87+   private  final  ZKClientConfig  zkClientConfig ;
8388
8489  /** 
85-    * See {@link #connect(Configuration, String, Watcher, String)}  
90+    * See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}.  
8691   */ 
8792  public  static  RecoverableZooKeeper  connect (Configuration  conf , Watcher  watcher )
8893    throws  IOException  {
8994    String  ensemble  = ZKConfig .getZKQuorumServersString (conf );
90-     return  connect (conf , ensemble , watcher );
91-   }
92- 
93-   /** 
94-    * See {@link #connect(Configuration, String, Watcher, String)} 
95-    */ 
96-   public  static  RecoverableZooKeeper  connect (Configuration  conf , String  ensemble , Watcher  watcher )
97-     throws  IOException  {
98-     return  connect (conf , ensemble , watcher , null );
95+     return  connect (conf , ensemble , watcher , null , null );
9996  }
10097
10198  /** 
10299   * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified 
103100   * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring 
104101   * watcher to the specified watcher. 
105-    * @param conf       configuration to pull ensemble and other settings from 
106-    * @param watcher    watcher to monitor connection changes 
107-    * @param ensemble   ZooKeeper servers quorum string 
108-    * @param identifier value used to identify this client instance. 
102+    * @param conf           configuration to pull ensemble and other settings from 
103+    * @param watcher        watcher to monitor connection changes 
104+    * @param ensemble       ZooKeeper servers quorum string 
105+    * @param identifier     value used to identify this client instance. 
106+    * @param zkClientConfig client specific configurations for this instance 
109107   * @return connection to zookeeper 
110108   * @throws IOException if unable to connect to zk or config problem 
111109   */ 
112110  public  static  RecoverableZooKeeper  connect (Configuration  conf , String  ensemble , Watcher  watcher ,
113-     final  String  identifier ) throws  IOException  {
111+     final  String  identifier ,  ZKClientConfig   zkClientConfig ) throws  IOException  {
114112    if  (ensemble  == null ) {
115113      throw  new  IOException ("Unable to determine ZooKeeper ensemble" );
116114    }
@@ -123,14 +121,12 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
123121    int  maxSleepTime  = conf .getInt ("zookeeper.recovery.retry.maxsleeptime" , 60000 );
124122    int  multiMaxSize  = conf .getInt ("zookeeper.multi.max.size" , 1024  * 1024 );
125123    return  new  RecoverableZooKeeper (ensemble , timeout , watcher , retry , retryIntervalMillis ,
126-       maxSleepTime , identifier , multiMaxSize );
124+       maxSleepTime , identifier , multiMaxSize ,  zkClientConfig );
127125  }
128126
129-   @ edu .umd .cs .findbugs .annotations .SuppressWarnings (value  = "DE_MIGHT_IGNORE" ,
130-       justification  = "None. Its always been this way." )
131-   public  RecoverableZooKeeper (String  quorumServers , int  sessionTimeout , Watcher  watcher ,
132-     int  maxRetries , int  retryIntervalMillis , int  maxSleepTime , String  identifier , int  maxMultiSize )
133-     throws  IOException  {
127+   RecoverableZooKeeper (String  quorumServers , int  sessionTimeout , Watcher  watcher , int  maxRetries ,
128+     int  retryIntervalMillis , int  maxSleepTime , String  identifier , int  maxMultiSize ,
129+     ZKClientConfig  zkClientConfig ) throws  IOException  {
134130    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 
135131    this .retryCounterFactory  =
136132      new  RetryCounterFactory (maxRetries  + 1 , retryIntervalMillis , maxSleepTime );
@@ -148,12 +144,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa
148144    this .sessionTimeout  = sessionTimeout ;
149145    this .quorumServers  = quorumServers ;
150146    this .maxMultiSize  = maxMultiSize ;
151- 
152-     try  {
153-       checkZk ();
154-     } catch  (Exception  x ) {
155-       /* ignore */ 
156-     }
147+     this .zkClientConfig  = zkClientConfig ;
157148  }
158149
159150  /** 
@@ -172,10 +163,10 @@ public int getMaxMultiSizeLimit() {
172163   * @return The created ZooKeeper connection object 
173164   * @throws KeeperException if a ZooKeeper operation fails 
174165   */ 
175-   protected  synchronized  ZooKeeper  checkZk () throws  KeeperException  {
166+   private  synchronized  ZooKeeper  checkZk () throws  KeeperException  {
176167    if  (this .zk  == null ) {
177168      try  {
178-         this .zk  = new  ZooKeeper (quorumServers , sessionTimeout , watcher );
169+         this .zk  = new  ZooKeeper (quorumServers , sessionTimeout , watcher ,  zkClientConfig );
179170      } catch  (IOException  ex ) {
180171        LOG .warn ("Unable to create ZooKeeper Connection" , ex );
181172        throw  new  KeeperException .OperationTimeoutException ();
0 commit comments